欢迎来到飞桨分布式技术文档主页¶
欢迎您关注飞桨分布式训练,我们希望能帮助每一个用户走上大规模工业化生产之路!
整体介绍与内容概览¶
欢迎关注大规模深度学习技术¶
近十年来,深度学习技术不断刷新视觉、自然语言、语音、搜索、推荐等领域各种任务的记录。这其中的原因,用一个关键词描述就是“大规模”。大规模的数据使得模型有足够的知识可以记忆,大规模参数量的模型使得模型本身有能力记忆更多的数据,大规模高性能的算力(以GPU为典型代表)使得模型的训练速度有百倍甚至千倍的提升。数据、模型、算力的发展催生了大规模深度学习这个领域,如何进行多机任务的拆分、如何配置集群训练资源、如何平衡训练速度和收敛速度、如何训练单机无法训练的模型、弹性训练与容错等都是这个方向重点研究的问题。
飞桨分布式训练提供的核心价值¶
源自产业实践的经验:
飞桨的分布式训练技术源自百度的业务实践,是经过超大规模业务数据检验过的训练框架。
飞桨分布式训练经过实践检验的应用领域包括自然语言处理,计算机视觉,搜索,推荐等。
完备的并行模式:
数据并行:针对产业界最常用的数据并行模式,飞桨针对实际业务需求重点打磨多项技术,包括;飞桨提供集合通信架构和参数服务器架构两种方式,支持工业实践中常见的同步训练和异步训练的机制,并提供收敛效果有保障的分布式优化算法。
流水线并行:面向异构硬件,流水线并行能够将模型计算部分拆分到不同硬件并充分流水线化,从而大规模提升异构硬件的整体利用率。
模型并行:对于超大规模分类问题,飞桨提供计算与存储同时并行的模型并行,解决单GPU无法解决的问题。
面向云端场景的并行训练组件:
飞桨针对集群网络环境、硬件设备比较低配的场景提供多种实用的并行策略和优化算法。
针对云端算力具有弹性的特点,飞桨也始终在探索弹性深度学习的应用。
开始你的分布式训练之旅¶
整体内容:我们推荐您直接根据主页,按照章节顺序逐个浏览学习,如果有任何疑问都可以在Paddle、FleetX提交issue提问
FAQ:对于高频出现的问题,我们会定期整理相关内容到FAQ
快速上手:如果想最低成本的了解飞桨的分布式训练,我们推荐阅读静态图并行训练快速开始和动态图并行训练快速开始
GPU多机训练:如果您已经开始使用GPU进行多机多卡训练,数据并行同步训练实践是很好的参考。
参数服务器:信息检索、推荐系统领域常用的并行训练方式,可以参考使用Fleet进行参数服务器训练。
公有云环境实践:如果您在公有云上跑自己的GPU多卡任务,性能不佳,优化低配网络的分布式GPU训练是调优性能的好方法
弹性训练:如果对如何利用云端弹性资源进行大规模蒸馏训练有兴趣,可以阅读EDL 服务型弹性蒸馏
RoadMap¶
我们也会推送大规模深度学习技术领域最前沿的技术到这里
近期:千亿规模模型参数的GPU多机多卡训练,敬请期待
安装Paddle与FleetX¶
Paddle¶
使用飞桨进行分布式训练的最小安装集合就是安装Paddle。从Paddle 2.0版本开始,我们面向不同用户群体提供不同类型的分布式训练API。
面向算法工程师为主的高级API paddle.distributed.fleet。
面向具有分布式训练底层工程开发能力的工程师提供的API paddle.distributed。
您只需要安装Paddle,就可以获得飞桨团队官方提供的所有分布式训练功能。
pip install paddlepaddle-gpu
关于安装Paddle,这里 有更完备的安装指南供您参考。
FleetX¶
更大规模的数据、能够记忆并泛化大数据的模型、超大规模算力是利用深度学习技术提升业务效果的有效方法。我们针对需要大规模数据、大容量模型并需要进行高性能分布式训练的应用场景,开发了 FleetX 工具包。
在数据维度,提供用户可以快速定义的标准公开数据集以及低门槛替换自己业务数据集的接口。
在模型维度,FleetX提供典型的分布式训练场景下最常用的标准模型供用户直接使用,例如标准的预训练模型Resnet50、Ernie、Bert等。
在利用大规模算力集群方面,FleetX使用Paddle原生提供的分布式训练能力,面向不同的模型提供最佳的分布式训练实践,在保证收敛效果的前提下最大化用户的集群使用效率。
pip install fleet-x==0.0.7
或者使用我们为用户提供了已经编译好的安装包,可以下载到本机后下载:
# python2
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.7-py2-none-any.whl
pip install fleet_x-0.0.4-py2-none-any.whl
# python3
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.7-py3-none-any.whl
pip3 install fleet_x-0.0.4-py3-none-any.whl
静态图分布式训练快速开始¶
对于大部分用户来讲,数据并行训练基本可以解决实际业务中的训练要求。我们以一个非常简单的神经网络为例,介绍如何使用飞桨高级分布式API
paddle.distributed.fleet
进行数据并行训练。在数据并行方式下,通常可以采用两种架构进行并行训练,即集合通信训练(Collective
Training)和参数服务器训练(Parameter Server
Training),接下来的例子会以同样的模型来说明两种架构的数据并行是如何实现的。
版本要求¶
paddlepaddle-2.0.0-rc-cpu / paddlepaddle-2.0.0-rc-gpu及以上
模型描述¶
为了方便说明,我们采用两层全连接网络的分类模型,并使用CrossEntropyLoss
来评价模型是否优化的符合目标,数据方面我们采用Paddle
内置的Mnist
数据集,存放在model.py
import paddle
import paddle.static.nn as nn
paddle.enable_static()
def mnist_on_mlp_model():
train_dataset = paddle.vision.datasets.MNIST(mode='train')
test_dataset = paddle.vision.datasets.MNIST(mode='test')
x = paddle.data(name="x", shape=[64, 1, 28, 28], dtype='float32')
y = paddle.data(name="y", shape=[64, 1], dtype='int64')
x_flatten = paddle.reshape(x, [64, 784])
fc_1 = nn.fc(input=x_flatten, size=128, act='tanh')
fc_2 = nn.fc(input=fc_1, size=128, act='tanh')
prediction = nn.fc(input=[fc_2], size=10, act='softmax')
cost = paddle.fluid.layers.cross_entropy(input=prediction, label=y)
acc_top1 = paddle.fluid.layers.accuracy(input=prediction, label=y, k=1)
avg_cost = paddle.fluid.layers.mean(x=cost)
return train_dataset, test_dataset, x, y, avg_cost, acc_top1
采用GPU多机多卡进行同步训练¶
collective_trainer.py
import os
import paddle
import paddle.distributed.fleet as fleet
from model import mnist_on_mlp_model
train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
train_dataloader = paddle.io.DataLoader(
train_data, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, shuffle=True)
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
#optimizer = paddle.optimizer.Adam(learning_rate=0.01)
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(cost)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
epoch = 10
step = 0
for i in range(epoch):
for data in train_dataloader():
step += 1
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
单机四卡训练启动命令
fleetrun --gpus 0,1,2,3 collective_trainer.py
采用参数服务器进行多机训练¶
parameter_server_trainer.py
import paddle
import paddle.distributed.fleet as fleet
from model import mnist_on_mlp_model
paddle.enable_static()
train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()
fleet.init()
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
if fleet.is_server():
fleet.init_server()
fleet.run_server()
else:
place = paddle.CPUPlace()
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
fleet.init_worker()
train_dataloader = paddle.io.DataLoader(
train_data, feed_list=[x, y], drop_last=True, places=place,
batch_size=64, shuffle=True)
epoch = 1
for i in range(epoch):
for data in train_dataloader():
cost_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
print("loss: {}, acc: {}".format(cost_val, acc_val))
fleet.stop_worker()
两节点Server,两节点Worker的启动命令
fleetrun --worker_num 2 --server_num 2 parameter_server_trainer.py
动态图分布式训练快速开始¶
Paddle官方文档中对动态图(命令式编程)做了比较详细的介绍。Paddle的分布式高级API paddle.distributed.fleet
接口从Paddle
2.0-RC版本开始支持动态图分布式任务执行。本篇文章我们将介绍如何使用 paddle.distributed.fleet
接口进行动态图分布式训练。接下来我们以一个简单全连接网络实例为例,说明如何将单机单卡训练改成分布式单机多卡训练,再到多机多卡训练。
注:目前paddle.distributed.fleet
启动动态图分布式训练仅支持集合通信(Colletive Communication)模式,不支持参数服务器(Parameter-Server)模式。本文示例为集合通信模式任务。
版本要求¶
paddlepaddle 2.0-rc-gpu版本及以上
单机单卡训练¶
下面是一个非常简单的动态图单机单卡程序。网络只有只有2层全连接层,用均方差误差(MSELoss)作为损失函数,Adam优化器进行参数的更新。用随机产生的数据进行训练,循环迭代20轮中,每轮打印出当前网络具体的损失值。
# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
# 定义全连接网络,需继承自nn.Layer
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
# 1.开启动态图模式
paddle.disable_static()
# 2. 定义网络对象,损失函数和优化器
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
for step in range(20):
# 3. 执行前向网络
inputs = paddle.randn([10, 10], 'float32')
outputs = layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("step:{}\tloss:{}".format(step, loss.numpy()))
# 4. 执行反向计算和参数更新
loss.backward()
adam.step()
adam.clear_grad()
将以上代码保存为train_single.py
,运行python train_single.py
,您将看到显示如下日志信息:
step:0 loss:[1.2709768]
step:1 loss:[0.7705929]
step:2 loss:[2.2044802]
step:3 loss:[1.6021421]
step:4 loss:[2.0286825]
step:5 loss:[0.7866151]
step:6 loss:[1.926115]
step:7 loss:[0.3647427]
...
单机多卡训练¶
使用Fleet接口进行动态图分布式训练其实非常的简单,只需修改3个步骤:
导入
paddle.distributed.fleet
包
from paddle.distributed import fleet
初始化fleet环境
fleet.init(is_collective=True)
通过fleet获取分布式优化器和分布式模型
strategy = fleet.DistributedStrategy()
adam = fleet.distributed_optimizer(adam, strategy=strategy)
dp_layer = fleet.distributed_model(layer)
说明:目前静态图DistributedStrategy
下的分布式策略正逐步向动态图场景迁移中,敬请期待!
根据我们最开始提供的单机单卡代码示例,再根据3步口诀进行修改,完整的单机多卡示例代码如下:
# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
#分布式step 1: 导入paddle.distributed.fleet包
from paddle.distributed import fleet
# 定义全连接网络,需继承自nn.Layer
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
# 1.开启动态图模式
paddle.disable_static()
# 分布式step 2: 初始化fleet
fleet.init(is_collective=True)
# 2. 定义网络对象,损失函数和优化器
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
# 分布式step 3: 通过fleet获取分布式优化器和分布式模型
strategy = fleet.DistributedStrategy()
adam = fleet.distributed_optimizer(adam, strategy=strategy)
dp_layer = fleet.distributed_model(layer)
for step in range(20):
# 3. 执行前向网络
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("step:{}\tloss:{}".format(step, loss.numpy()))
# 4. 执行反向计算和参数更新
loss.backward()
adam.step()
adam.clear_grad()
将以上代码保存为train_fleet.py
,假设要运行2卡的任务,那么只需在命令行中执行:
fleetrun --gpus=0,1 dygraph_fleet.py
您将看到显示如下日志信息:
----------- Configuration Arguments -----------
gpus: 0,1
ips: 127.0.0.1
log_dir: log
server_num: None
servers:
training_script: dygraph_fleet.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 2020-0X-XX 08:33:30,247 launch.py:441] Run collective gpu mode. gpu arguments:['--gpus'], cuda count:8
INFO 2020-0X-XX 08:33:30,247 launch_utils.py:430] Local start 2 processes. First process distributed environment info (Only For Debug):
+=======================================================================================+
| Distributed Envs Value |
+---------------------------------------------------------------------------------------+
| PADDLE_CURRENT_ENDPOINT 127.0.0.1:59664 |
| PADDLE_TRAINERS_NUM 2 |
| FLAGS_selected_gpus 0 |
| PADDLE_TRAINER_ENDPOINTS 127.0.0.1:59664,127.0.0.1:48993 |
| PADDLE_TRAINER_ID 0 |
+=======================================================================================+
step:0 loss:[1.3279431]
step:1 loss:[2.5023699]
step:2 loss:[3.3197324]
step:3 loss:[2.6869867]
step:4 loss:[2.6306524]
step:5 loss:[1.9267073]
step:6 loss:[1.2037501]
step:7 loss:[1.1434236]
...
完整2卡的日志信息也可在./log/
目录下查看。了解更多fleetrun
的用法可参考左侧文档fleetrun 启动分布式任务
。
多机多卡训练¶
从单机多卡到多机多卡训练,在代码上并不需要做任何改动,只需修改启动命令,以2机4卡为例:
fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus=0,1 dygraph_fleet.py
在2台机器上分别运行以上启动命令,fleetrun
将在后台分别启动2个多进程任务,执行分布式多机训练。
您将在ip为xx.xx.xx.xx的机器上看到命令台输出日志信息:
----------- Configuration Arguments -----------
gpus: None
ips: xx.xx.xx.xx,yy.yy.yy.yy
log_dir: log
server_num: None
servers:
training_script: dygraph_fleet.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 2020-0X-XX 21:29:41,918 launch.py:434] Run collective gpu mode. gpu arguments:['--ips'], cuda count:2
INFO 2020-0X-XX 21:29:41,919 launch_utils.py:426] Local start 2 processes. First process distributed environment info (Only For Debug):
+=======================================================================================+
| Distributed Envs Value |
+---------------------------------------------------------------------------------------+
| PADDLE_CURRENT_ENDPOINT xx.xx.xx.xx:6070 |
| PADDLE_TRAINERS_NUM 4 |
| FLAGS_selected_gpus 0 |
| PADDLE_TRAINER_ENDPOINTS ... :6071,yy.yy.yy.yy:6070,yy.yy.yy.yy:6071|
| PADDLE_TRAINER_ID 0 |
+=======================================================================================+
step:0 loss:[5.2519045]
step:1 loss:[3.139771]
step:2 loss:[2.0075738]
step:3 loss:[1.4674551]
step:4 loss:[4.0751777]
step:5 loss:[2.6568782]
step:6 loss:[1.1998112]
...
同样完整的日志信息也分别在xx.xx.xx.xx机器和yy.yy.yy.yy机器上的./log/
目录下查看。
小结¶
至此,相信您已经通过3步口诀掌握了如何将一个普通的paddle动态图单卡任务转换为多卡任务。推荐使用单卡进行调试,真正执行训练时切换为多卡任务。我们也将在未来继续完善Fleet动态图模块,通过与静态图类似的方式实现分布式训练任务在不同场景下的优化,敬请期待!
使用fleetrun启动分布式任务¶
Paddle提供命令行启动命令fleetrun
,配合Paddle的分布式高级APIpaddle.distributed.fleet
即可轻松启动Paddle集合通信模式或参数服务器模式下的分布式任务。
fleetrun
在静态图和动态图场景下均可使用。
内容导航¶
使用说明¶
fleetrun
使用场景主要分为集合通信训练(Collective
Training)和参数服务器训练(Parameter Server
Training)。
集合通信训练一般在GPU设备上运行,因此我们将介绍GPU单机单卡,单机多卡和多机多卡场景下使用fleetrun
的方法。
参数服务器训练包含服务节点、训练节点以及异构训练节点的启动,
因此我们将介绍在CPU集群、GPU集群上和异构集群上如何使用fleetrun
启动分布式训练 。fleetrun
支持在百度公司内部云PaddleCloud上运行分布式任务,推荐结合fleetsub
命令,一键快速提交集群任务。详情请参考使用fleetsub提交集群任务。
你也可以使用 python -m paddle.distributed.launch
来启动训练任务,事实上, fleetrun
是前者的快捷方式。
集合通信训练¶
GPU单机单卡训练
单机单卡有两种方式:一种可直接使用python
执行,也可以使用fleetrun
执行。推荐使用fleetrun
启动方法。
【方法一】直接使用python
执行
export CUDA_VISIBLE_DEVICES=0
python train.py
【方法二】使用fleetrun
执行
fleetrun --gpus=0 train.py
注:如果指定了export CUDA_VISIBLE_DEVICES=0
,则可以直接使用:
export CUDA_VISIBLE_DEVICES=0
fleetrun train.py
GPU单机多卡训练
若启动单机4卡的任务,只需通过--gpus
指定空闲的4张卡即可。
fleetrun --gpus=0,1,2,3 train.py
注:如果指定了export CUDA_VISIBLE_DEVICES=0,1,2,3
,则可以直接使用:
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun train.py
GPU多机多卡训练
[示例一] 2机8卡 (每个节点4卡)
fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus=0,1,2,3 train.py
注:如果每台机器均指定了export CUDA_VISIBLE_DEVICES=0,1,2,3
,则可以直接在每台节点上启动:
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" train.py
[示例二] 2机16卡(每个节点8卡,假设每台机器均有8卡可使用)
fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" train.py
参数服务器训练¶
在CPU集群运行参数服务器¶
参数服务器训练 - 单机模拟分布式训练
1台机器通过多进程模拟分布式训练,1个服务节点搭配4个训练节点。
fleetrun
启动时只需指定服务节点数--server_num
和训练节点数--worker_num
,即可进行单机模拟分布式训练,推荐使用此方法进行本地调试。
fleetrun --server_num=1 --worker_num=4 train.py
参数服务器训练 - 自定义多机训练
fleetrun
启动时只需指定服务节点的ip和端口列表--servers
和训练节点的ip列表--workers
,即可进行多机训练。
下列示例中,xx.xx.xx.xx代表机器1,yy.yy.yy.yy代表机器2,6170代表用户指定的服务节点的端口。fleetrun
将分别在2台机器上启动1个服务节点,4个训练节点。
# 2个servers 8个workers
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx,yy.yy.yy.yy,yy.yy.yy.yy,yy.yy.yy.yy,yy.yy.yy.yy" train.py
--workers
参数可以仅指定ip列表,此时fleetrun
将会在启动训练任务前分配好连续端口给每个训练节点。fleetrun
分配的连续端口可能会出现端口被其他任务占用的情况,此时多机训练无法正常启动。因此--workers
参数支持配置用户指定端口,写法与--servers
一致,示例如下:
# 2个servers 8个workers
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,xx.xx.xx.xx:6173,xx.xx.xx.xx:6174,xx.xx.xx.xx:6175,yy.yy.yy.yy:6176,yy.yy.yy.yy:6177,yy.yy.yy.yy:6178,yy.yy.yy.yy:6179" train.py
在GPU集群运行参数服务器¶
参数服务器训练 - 单机模拟分布式训练
1台机器通过多进程模拟,2个服务节点搭配4个训练节点,每个训练节点占用一张GPU卡,服务节点不占用GPU卡。
# 2个server 4个worker
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --server_num=2 --worker_num=4 train.py
1台机器通过多进程模拟, 2个服务节点搭配2个训练节点,两个训练节点共用一张GPU卡,服务节点不占用GPU卡。
# 2个server 2个worker
export CUDA_VISIBLE_DEVICES=0
fleetrun --server_num=2 --worker_num=2 train.py
参数服务器训练 - 自定义多机训练
fleetrun
启动时只需指定服务节点的ip和端口列表--servers
和
训练节点的ip和端口列表--workers
,即可进行多机训练。
以下示例中,xx.xx.xx.xx代表机器1,yy.yy.yy.yy代表机器2,6170代表用户指定的服务节点的端口。fleetrun
将分别在2台机器上启动1个服务节点,1个训练节点。训练节点会分别占用其机器上的0号GPU卡进行训练。
# 2台机器,每台机器均有1个服务节点,1个训练节点
# 2个server 2个worker
# 每台机器均指定了可用设备 GPU:0
export CUDA_VISIBLE_DEVICES=0
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,yy.yy.yy.yy:6173" train.py
以下示例中,fleetrun
将分别在2台机器上启动1个服务节点,4个训练节点。训练节点会分别占用其机器上的0,1,2,3号GPU卡进行训练。
# 2台机器,每台机器均有1个服务节点,4个训练节点
# 2个server 4个worker
# 每台机器均指定了可用设备 GPU:0,1,2,3
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,xx.xx.xx.xx:6173,xx.xx.xx.xx:6174,xx.xx.xx.xx:6175,yy.yy.yy.yy:6176,yy.yy.yy.yy:6177,yy.yy.yy.yy:6178,yy.yy.yy.yy:6179" train.py
异构集群运行参数服务器¶
参数服务器训练 - 单机模拟分布式训练
1台机器通过多进程模拟,2个服务节点搭配2个训练节点以及2个异构训练节点,每个异构训练节点占用一张GPU卡,其余服务节点和训练节点均在CPU上执行。
# 2个server 4个worker
export CUDA_VISIBLE_DEVICES=0,1
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 train.py
fleetrun命令参数介绍¶
Collective模式相关参数:
ips (str,可选): 指定选择哪些节点IP进行训练,默认为『127.0.0.1』, 即会在本地执行单机单卡或多卡训练。
gpus(str, 可选):指定选择哪些GPU卡进行训练,默认为None,即会选择
CUDA_VISIBLE_DEVICES
所显示的所有卡。不设置nproc_per_node
参数时,将启动GPU个数个进程进行训练,每个进程绑定一个GPU卡。nproc_per_node (int, 可选):设置多少个进程进行训练。设置数目需要少于或者等于参与训练的GPU的个数以便每个进程可以绑定一个或者平均个数的GPU卡;不能使用GPU训练时,会启动相应个数的CPU进程进行Collective训练。
参数服务器模式可配参数:
server_num(int,可选):单机模拟分布式任务中,指定参数服务器服务节点的个数
worker_num(int,可选):单机模拟分布式任务中,指定参数服务器训练节点的个数
heter_worker_num(int,可选):在异构集群中启动单机模拟分布式任务,指定参数服务器异构训练节点的个数
servers(str, 可选): 多机分布式任务中,指定参数服务器服务节点的IP和端口
workers(str, 可选): 多机分布式任务中,指定参数服务器训练节点的IP和端口,也可只指定IP
heter_workers(str, 可选): 在异构集群中启动分布式任务,指定参数服务器异构训练节点的IP和端口
http_port(int, 可选):参数服务器模式中,用Gloo启动时设置的连接端口
其他:
log_dir(str, 可选): 指定分布式任务训练日志的保存路径,默认保存在“./log/”目录。
使用fleetrun进行GPU多卡训练实例¶
下面我们将通过例子,为您详细介绍如何利用fleetrun
将单机单卡训练任务转换为单机多卡训练任务。
这里使用与静态图分布式训练快速开始 相同的示例代码进行说明。
import os
import time
import paddle
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
import paddle.fluid as fluid
def mnist_on_mlp_model():
train_dataset = paddle.vision.datasets.MNIST(mode='train')
test_dataset = paddle.vision.datasets.MNIST(mode='test')
x = paddle.data(name="x", shape=[64, 1, 28, 28], dtype='float32')
y = paddle.data(name="y", shape=[64, 1], dtype='int64')
x_flatten = fluid.layers.reshape(x, [64, 784])
fc_1 = nn.fc(input=x_flatten, size=128, act='tanh')
fc_2 = nn.fc(input=fc_1, size=128, act='tanh')
prediction = nn.fc(input=[fc_2], size=10, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=y)
acc_top1 = fluid.layers.accuracy(input=prediction, label=y, k=1)
avg_cost = fluid.layers.mean(x=cost)
return train_dataset, test_dataset, x, y, avg_cost, acc_top1
paddle.enable_static()
train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
train_dataloader = paddle.io.DataLoader(
train_data, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, shuffle=True)
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
#optimizer = paddle.optimizer.Adam(learning_rate=0.01)
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(cost)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
epoch = 10
for i in range(epoch):
total_time = 0
step = 0
for data in train_dataloader():
step += 1
start_time = time.time()
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
if step % 200 == 0:
end_time = time.time()
total_time += (end_time - start_time)
print(
"epoch: %d, step:%d, train_loss: %f, total time cost = %f, speed: %f"
% (i, step, loss_val[0], total_time,
1 / (end_time - start_time) ))
单机单卡训练¶
将上述代码保存在train.py
代码中,单机单卡训练十分的简单,只需要:
export CUDA_VISIBLE_DEVICES=0
python train.py
可以看见终端上打印日志信息:
epoch: 0, step:200, train_loss: 0.424425, total time cost = 0.000947, speed: 1055.967774
epoch: 0, step:400, train_loss: 0.273742, total time cost = 0.001725, speed: 1285.413423
epoch: 0, step:600, train_loss: 0.472131, total time cost = 0.002467, speed: 1347.784062
epoch: 0, step:800, train_loss: 0.445613, total time cost = 0.003184, speed: 1394.382979
epoch: 1, step:200, train_loss: 0.512807, total time cost = 0.000681, speed: 1468.593838
epoch: 1, step:400, train_loss: 0.571385, total time cost = 0.001344, speed: 1508.199928
epoch: 1, step:600, train_loss: 0.617232, total time cost = 0.002034, speed: 1449.310297
epoch: 1, step:800, train_loss: 0.392537, total time cost = 0.002813, speed: 1283.446756
epoch: 2, step:200, train_loss: 0.288508, total time cost = 0.000796, speed: 1256.155735
epoch: 2, step:400, train_loss: 0.448433, total time cost = 0.001531, speed: 1360.461888
epoch: 2, step:600, train_loss: 0.593330, total time cost = 0.002292, speed: 1314.005013
...
单机多卡训练¶
从单机单卡训练到单机多卡训练不需要改动train.py
代码,只需改一行启动命令:
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun train.py
训练日志可以在终端上查看,也可稍后在./log/目录下查看每个卡的日志。 终端可以看到显示日志如下:
----------- Configuration Arguments -----------
gpus: 0,1,2,3
ips: 127.0.0.1
log_dir: log
server_num: None
servers:
training_script: train.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 202X-0X-0X 06:09:36,185 launch_utils.py:425] Local start 4 processes. First process distributed environment info (Only For Debug):
=======================================================================================
Distributed Envs Value
---------------------------------------------------------------------------------------
PADDLE_CURRENT_ENDPOINT 127.0.0.1:33360
PADDLE_TRAINERS_NUM 4
FLAGS_selected_gpus 0
PADDLE_TRAINER_ENDPOINTS ... 0.1:11330,127.0.0.1:54803,127.0.0.1:49294
PADDLE_TRAINER_ID 0
=======================================================================================
epoch: 0, step:200, train_loss: 0.306129, total time cost = 0.001170, speed: 854.759323
epoch: 0, step:400, train_loss: 0.287594, total time cost = 0.002226, speed: 947.009257
epoch: 0, step:600, train_loss: 0.179934, total time cost = 0.003201, speed: 1025.752996
epoch: 0, step:800, train_loss: 0.137214, total time cost = 0.005004, speed: 554.582044
epoch: 1, step:200, train_loss: 0.302534, total time cost = 0.000975, speed: 1025.752996
epoch: 1, step:400, train_loss: 0.375780, total time cost = 0.001934, speed: 1042.581158
epoch: 1, step:600, train_loss: 0.247651, total time cost = 0.002892, speed: 1043.878547
epoch: 1, step:800, train_loss: 0.086278, total time cost = 0.003845, speed: 1049.363022
.....
FleetX快速开始¶
FleetX是什么?¶
FleetX
提供效率最高的分布式模型预训练功能,它可以作为paddle.distributed.fleet
的扩展进行配合使用。
提供哪些功能?¶
短代码定义预训练模型
预置经典模型的公开训练数据
用户可低成本替换自有数据集
面向每个模型的最佳分布式训练实践
上手示例¶
以下通过图像分类Resnet50的例子,说明如何使用FleetX的接口进行分布式训练。具体步骤如下:
导入依赖
构建模型
定义分布式策略
开始训练
为了简化模型定义的过程,我们在后面的文档中会尽量使用FleetX封装的高级API,方便用户理解分布式训练的核心内容。
1. 导入依赖¶
FleetX依赖Paddle 1.8.0及之后的版本。请确认已安装正确的Paddle版本,并按照以下方式导入Paddle 及 FleetX。
import paddle
import paddle.distributed.fleet as fleet
import fleetx as X
2. 构建模型¶
通过FleetX提供的 X.applications
接口,用户可以使用一行代码加载一些经典的深度模型,如:Resnet50,VGG16,BERT,Transformer等。同时,用户可以使用一行代码加载特定格式的数据,如对于图像分类任务,用户可以加载ImageNet格式的数据。
paddle.enable_static()
configs = X.parse_train_configs()
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(fs_yml="https://xxx.xx.xx.xx/full_imagenet_bos.yml", local_path='./data')
loader = model.get_train_dataloader(local_path, batch_size=32)
3. 定义分布式策略¶
在定义完单机训练网络后,用户可以使用paddle.distributed.fleet.DistributedStrategy()
接口定义分布式策略,将模型转成分布式模式。
# 使用paddle.distributed.fleet进行collective training
fleet.init(is_collective=True)
# 定义DistributedStrategy
dist_strategy = fleet.DistributedStrategy()
# 装饰单机optimizer为分布式optimizer
optimizer = paddle.distributed.fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
4. 开始训练¶
可以使用FleetX
内置的训练器进行快速训练,方便算法工程师快速上手:
trainer = X.MultiGPUTrainer()
trainer.fit(model, loader, epoch=10)
用户也可以采用Paddle原生的API进行训练流程的定义,代码如下:
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for epoch_id in range(5):
step_id = 0
for data in loader:
cost_val = exe.run(paddle.default_main_program(),
feed=data,
fetch_list=[model.loss.name])
if step_id % 100 == 0:
print("worker index: %d, epoch: %d, step: %d, train loss: %f"
% (fleet.worker_index(), epoch_id, step_id, cost_val[0]))
从Paddle 2.0 rc版本开始,我们统一采用fleetrun进行多卡训练的启动,方式如下:
fleetrun --gpus 0,1,2,3 resnet_app.py
关于fleetrun
命令,更详细的使用说明请参考fleetrun
使用fleetsub提交集群任务¶
fleetsub
是什么¶
当您安装了fleet-x
后,便可以使用fleetsub
在集群上提交分布式任务。长期的目标是成为集群任务提交的统一命令,只需要一行启动命令,就会将训练任务提交到离线训练集群中。目前该功能只支持百度公司内部云上的任务提交,使用fleetsub
前需要先安装paddlecloud客户端,后续我们会支持更多的公有云任务提交。
使用要求¶
使用fleetsub
命令的要求:安装fleet-x
【方法一】从pip源安装
pip install fleet-x
【方法二】下载whl包并在本地安装
# python2
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.4-py2-none-any.whl
pip install fleet_x-0.0.4-py2-none-any.whl
# python3
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.4-py3-none-any.whl
pip3 install fleet_x-0.0.4-py3-none-any.whl
使用说明¶
在提交任务前,用户需要在yaml文件中配置任务相关的信息,如:节点数、镜像地址、集群信息、启动训练所需要的命令等。
首先看一个yaml文件的样例。因为信息安全的原因yaml文件中的信息做了脱敏。
num_trainers: 4
num_cards: 8
job_prefix: bert_base_pretraining
image_addr: ${image_addr:-"dockhub.com/paddlepaddle-public/paddle_ubuntu1604:cuda10.0-cudnn7-dev"}
cluster_name: v100-32-cluster
group_name: k8s-gpu-v100-8
server: paddlecloud.server.com
log_fs_name: "afs://xx.fs.com:9902"
log_fs_ugi: "ugi_name,ugi_passwd"
log_output_path: "/xx/yy/zz"
file_dir: "./"
whl_install_commands:
- pip install fleet_x-0.0.5-py2-none-any.whl
- pip install paddlepaddle_gpu-0.0.0-cp27-cp27mu-linux_x86_64.whl
commands:
- fleetrun bert_base.py --download_config=bert.yaml
字段名称 |
字段含义 |
类型 |
---|---|---|
num_trainers |
训练节点的数量 |
INT |
num_cards |
单节点上申请的GPU卡数 |
INT |
job_prefix |
任务名前缀 |
STRING |
image_addr |
镜像地址 |
{STRING} |
cluster_name |
集群名 |
STRING |
group_name |
群组名 |
STRING |
server |
集群master节点服务名 |
STRING |
log_fs_name |
任务日志存放的文件系统名 |
STRING |
log_fs_ugi |
任务日志存放的文件系统UGI |
STRING |
log_output_path |
任务日志存放的目标文件系统地址 |
STRING |
file_dir |
提交任务需要上传的文件目录 |
STRING |
whl_install_commands |
安装各种wheel包的命令 |
Repeated Command Line |
commands |
运行任务执行的各种命令 |
Repeated Command Line |
使用Fleet进行参数服务器训练¶
在大数据浪潮的推动下,有标签训练数据的规模取得了飞速的增长。现在人们通常用数百万甚至上千万的有标签图像来训练图像分类器(如,ImageNet包含1400万幅图像,涵盖两万多个种类),用成千上万小时的语音数据来训练语音模型(如,Deep Speech 2系统使用了11940小时的语音数据以及超过200万句表述来训练语音识别模型)。在真实的业务场景中,训练数据的规模可以达到上述数据集的数十倍甚至数百倍,如此庞大的数据需要消耗大量的计算资源和训练时间使模型达到收敛状态(数天时间)。
为了提高模型的训练效率,分布式训练应运而生,其中基于参数服务器的分布式训练为一种常见的中心化共享参数的同步方式。与单机训练不同的是在参数服务器分布式训练中,各个节点充当着不同的角色:
训练节点:该节点负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至服务节点。
服务节点:在收到所有训练节点传来的梯度后,该节点会将梯度聚合并更新参数。最后将参数发送给训练节点,开始新一轮的训练。
根据参数更新的方式不同,可以分为同步和异步两种:
同步训练:在同步参数服务器分布式训练中,所有训练节点的进度保持一致。每训练完一个Batch后,训练节点会上传梯度,然后开始等待服务节点返回更新后的参数。服务节点拿到所有训练节点上传的梯度后,才会对参数进行更新。因此,在任何一个时间点,所有训练节点都处于相同的训练阶段。
异步训练:与同步训练不同,在异步训练中任何两个训练节点之间的参数更新都互不影响。每一个训练节点完成训练、上传梯度后,服务节点都会立即更新参数并将结果返回至相应的训练节点。拿到最新的参数后,该训练节点会立即开始新一轮的训练。
下面我们将通过例子,为您介绍同步/异步训练在Fleet中的实现。
在开始之前我们首先需要下载训练中所需要的数据:
# 下载并解压数据,训练数据讲保存至名为 raw_data 的文件夹
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
实用样例¶
下面我们来介绍如何用Fleet接口,完成参数服务器分布式训练(假设训练脚本为ctr_app.py)。
导入依赖¶
import paddle
import os
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
定义分布式模式并初始化¶
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模型,init()
接口默认使用参数服务器模式,所以用户不需要定
义任何参数。
paddle.enable_static()
configs = X.parse_train_configs()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
加载模型及数据¶
用户可以通过X.applications
接口加载我们预先定义好的模型。在这个例子中我们将使用CTR-DNN模型,同时用户可以为模型定制的data_loader接口加载数据.
model = X.applications.MultiSlotCTR()
loader = model.load_criteo_from_file('./train_data')
定义同步训练 Strategy 及 Optimizer¶
在Fleet
API中,用户可以使用fleet.DistributedStrategy()
接口定义自己想要使用的分布式策略。
其中a_sync
选项用于定义参数服务器相关的策略,当其被设定为False
时,分布式训练将在同步的模式下进行。反之,当其被设定成True
时,分布式训练将在异步的模式下进行。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = False
optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。
对于服务器节点,首先用init_server()
接口对其进行初始化,然后启动服务并开始监听由训练节点传来的梯度。
同样对于训练节点,用init_worker()
接口进行初始化后,
开始执行训练任务。运行X.Trainer.fit
接口开始训练,并得到训练中每一步的损失值。
if fleet.is_server():
fleet.init_server()
fleet.run_server()
else:
fleet.init_worker()
trainer = X.Trainer(fluid.CPUPlace())
trainer.fit(model, loader, epoch=10)
运行训练脚本¶
定义完训练脚本后,我们就可以用fleetrun
指令运行分布式任务了。其中server_num
,
worker_num
分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。
fleetrun --server_num=1 --worker_num=2 ctr_app.py
使用InMemoryDataset/QueueDataset进行训练¶
简介¶
为了能高速运行模型的训练,我们使用InMemoryDataset/QueueDataset
API进行高性能的IO,具体介绍可以参考文档InMemoryDataset
和 QueueDataset
,
以下简称Dataset。Dataset是为多线程及全异步方式量身打造的数据读取方式,每个数据读取线程会与一个训练线程耦合,形成了多生产者-多消费者的模式,会极大的加速我们的模型训练。
本文以训练word2vector模型为例,在训练中引入基于Dataset API读取训练数据的方式,我们直接加载Fleetx预先定义好的word2vector模型,省去一切前期组网调试阶段,无需变更数据格式,只需在我们原本的训练代码中加入以下内容,便可轻松使用Dataset接口来进行训练。以下是使用Dataset接口一个比较完整的流程:
引入dataset¶
通过
dataset = paddle.distributed.InMemoryDataset()
或者dataset = paddle.distributed.QueueDataset()
创建一个Dataset对象指定dataset读取的训练文件的列表, 通过
set_filelist
配置。通过
dataset.init()
api 进行Dataset的初始化配置,init()
接口接收**kwargs参数, 详见api文档,列举几个配置的初始化将我们定义好的数据输入格式传给Dataset, 通过
use_var
配置。指定我们的数据读取方式,由
my_data_generator.py
实现数据读取的规则,后面将会介绍读取规则的实现, 通过pipe_command
配置。pipe_command
是Dataset特有的通过管道来读取训练样本的方式,通过set_filelist
设置的训练样本文件将被作为管道的输入cat
到管道中经过用户自定义的pipe_command
最终输出。指定数据读取的batch_size,通过batch_size配置。
指定数据读取的线程数,一般该线程数和训练线程应保持一致,两者为耦合的关系,通过
thread_num
配置。
dataset = paddle.distributed.InMemoryDataset()
batch_size = config.config["batch_size"]
thread_num = config.config["thread_num"]
dataset.init(use_var=model.inputs, pipe_command="python my_data_generator.py", batch_size=batch_size, thread_num=thread_num)
dataset.set_filelist([config.config["train_files_path"]])
如何指定数据读取规则¶
在上文我们提到了由my_data_generator.py
实现具体的数据管道读取规则,那么,怎样为dataset创建数据读取的规则呢?
以下是my_data_generator.py
的全部代码,具体流程如下: 1.
首先我们需要引入data_generator的类,位于paddle.distributed.fleet.data_generator
。
2.
声明一些在数据读取中会用到的类和库,如示例代码中的NumpyRandomInt
、logger
等。
3.
创建一个子类Word2VecReader
,继承fleet.data_generator
的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用MultiSlotDataGenerator
;若已经完成了预处理并保存为数据文件,可以直接以string
的方式进行读取,使用MultiSlotStringDataGenerator
,能够进一步加速。在示例代码,我们继承并实现了名为Word2VecReader
的data_generator子类,使用MultiSlotDataGenerator
方法。
4.
继承并实现基类中的generate_sample
函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
5.
在这个可以迭代的函数中,如示例代码中的def nce_reader()
,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
最后,我们需要将数据整理为特定的batch的格式,才能够被dataset正确读取,并灌入的训练的网络中。继承并实现基类中的
generate_batch
函数, 根据设定的’batch_size’, 该函数会在generator_sample
函数产生样本数达到batch_size
时,调用该函数内队逐条样本的处理逻辑,如示例代码中的def local_iter()
。简单来说,数据的输出顺序与我们在网络中创建的
inputs
必须是严格一一对应的,并转换为类似字典的形式。在示例代码中,我们将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如[('input_word',[value]),('true_label',[value]),('neg_label',[value])]
import sys
import io
import os
import re
import collections
import time
import config
import logging
import paddle
import numpy as np
import paddle.distributed.fleet as fleet
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class NumpyRandomInt(object):
def __init__(self, a, b, buf_size=1000):
self.idx = 0
self.buffer = np.random.random_integers(a, b, buf_size)
self.a = a
self.b = b
def __call__(self):
if self.idx == len(self.buffer):
self.buffer = np.random.random_integers(self.a, self.b,
len(self.buffer))
self.idx = 0
result = self.buffer[self.idx]
self.idx += 1
return result
class Word2VecReader(fleet.MultiSlotDataGenerator):
def init(self,
dict_path,
nce_num,
window_size=5):
self.window_size_ = window_size
self.nce_num = nce_num
word_all_count = 0
id_counts = []
word_id = 0
with io.open(dict_path, 'r', encoding='utf-8') as f:
for line in f:
word, count = line.split()[0], int(line.split()[1])
word_id += 1
id_counts.append(count)
word_all_count += count
self.word_all_count = word_all_count
self.corpus_size_ = word_all_count
self.dict_size = len(id_counts)
self.id_counts_ = id_counts
logger.info("corpus_size:", self.corpus_size_)
self.id_frequencys = [
float(count) / word_all_count for count in self.id_counts_
]
logger.info("dict_size = " + str(self.dict_size) + " word_all_count = " + str(word_all_count))
self.random_generator = NumpyRandomInt(1, self.window_size_ + 1)
def get_context_words(self, words, idx):
"""
Get the context word list of target word.
words: the words of the current line
idx: input word index
window_size: window size
"""
target_window = self.random_generator()
start_point = idx - target_window # if (idx - target_window) > 0 else 0
if start_point < 0:
start_point = 0
end_point = idx + target_window
targets = words[start_point:idx] + words[idx + 1:end_point + 1]
return targets
def generate_batch(self, samples):
def local_iter():
np_power = np.power(np.array(self.id_frequencys), 0.75)
id_frequencys_pow = np_power / np_power.sum()
cs = np.array(id_frequencys_pow).cumsum()
result = [[], []]
for sample in samples:
tensor_result = [("input_word", []), ("true_label", []), ("neg_label", [])]
tensor_result[0][1].extend(sample[0])
tensor_result[1][1].extend(sample[1])
neg_array = cs.searchsorted(np.random.sample(self.nce_num))
tensor_result[2][1].extend(neg_array)
yield tensor_result
return local_iter
def generate_sample(self, line):
def nce_reader():
word_ids = [int(w) for w in line.split()]
for idx, target_id in enumerate(word_ids):
context_word_ids = self.get_context_words(
word_ids, idx)
for context_id in context_word_ids:
yield [target_id], [context_id]
return nce_reader
if __name__ == "__main__":
my_data_generator = Word2VecReader()
my_data_generator.init(config.config["dict_path"], config.config["nce_num"])
my_data_generator.set_batch(config.config["batch_size"])
my_data_generator.run_from_stdin()
快速调试Dataset¶
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
cat 数据文件 | python dataset读取python文件
进行dataset代码的调试:
cat train_data/part_912 | python my_data_generator.py
输出的数据格式如下:
input_word:size ; input_word:value ; true_label:size ; true_label:value ; neg_label:size ; neg_label:value
理想的输出为(截取了一个片段):
...
1 112 1 2739 5 6740 451 778 90446 3698
...
使用Dataset的一些注意事项 - Dataset的基本原理:将数据print到缓存,再由C++端的代码实现读取,因此,我们不能在dataset的读取代码中,加入与数据读取无关的print信息,会导致C++端拿到错误的数据信息。 - dataset目前只支持在
unbuntu
及CentOS
等标准Linux环境下使用,在Windows
及Mac
下使用时,会产生预料之外的错误,请知悉。
训练¶
我们把原来的训练代码:
trainer = X.CPUTrainer()
trainer.fit(model, loader, epoch=10)
替换成如下使用Dataset
训练的流程, 我们以一个epoch为例:
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import config
# 开启paddle静态图模式
paddle.enable_static()
fleet.init()
model = X.applications.Word2vec()
"""
need config loader correctly.
"""
loader = model.load_dataset_from_file(train_files_path=[config.config["train_files_path"]], dict_path=config.config["dict_path"])
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
if fleet.is_server():
fleet.init_server()
fleet.run_server()
else:
place = paddle.CPUPlace()
fleet.init_worker()
exe = paddle.static.Executor(place)
default_startup_program = paddle.static.Program()
default_main_program = paddle.static.Program()
scope1 = fluid.Scope()
with fluid.scope_guard(scope1):
exe.run(model.startup_prog)
dataset = paddle.distributed.QueueDataset()
batch_size = config.config["batch_size"]
thread_num = config.config["thread_num"]
dataset.init(use_var=model.inputs, pipe_command="python my_data_generator.py", batch_size=batch_size, thread_num=thread_num)
dataset.set_filelist([config.config["train_files_path"]])
with fluid.scope_guard(scope1):
exe.train_from_dataset(model.main_prog, dataset, scope1, debug=False, fetch_list=[model.loss], fetch_info=["loss"], print_period=10)
fleet.stop_worker()
最后添加上述代码使用的配置文件config.py
config = dict()
config["dict_path"] = "thirdparty/test_build_dict"
config["train_files_path"] = "demo_train_data/part_1"
config["batch_size"] = 1000
config["nce_num"] = 5
config["thread_num"] = 12
通过以上简洁的代码,即可以实现word2vector模型的多线程并发训练
Collective 同步训练实践¶
同步训练简介¶
许多研究表明深度学习的预训练受益于更多的数据[1] [2] [3],但更大的数据量也意味着更长的训练耗时,数据并行同步训练是一种加速大规模数据训练的方法,有PServer和Collective两种模式。
同步训练通过数据划分,将计算工作量(前向、反向)分布到GPU 集群中的每一个worker上, 提高整体计算吞吐。但参数更新(update) 的过程在两种模式中有所不同:
在
PServer模式
中,会启动多个pservers 和多个trainers,每个pserver会保存一部分模型参数,并负责接收从trainer发送的梯度并更新这些模型参数;每个trainer 会保存一份完整的模型,并使用一部分数据进行训练,然后向pserver发送梯度,最后从pserver拉取更新后的参数。 pserver进程和trainer可以在不同的计算节点上,也可以在同一公用节点。一个分布式任务所需要的pserver进程个数通常需要根据实际情况调整,以达到最佳的性能,然而通常来说pserver的进程不会比trainer更多。

在
Collective模式
中,集群中只存在多个地位平等的trainers。 每个trainer进程都保存一份完整的模型参数。 前向和反向中每个 trainer 使用自己划分 (shard)的数据进行计算,得到对应的梯度;之后trainers 之间通过 allreduce 等 Collective 通信方式[4] 同步梯度到所有trainers,最后每个 trainer 使用同步后的梯度独立完成参数更新。

相交于异步训练, 同步训练的的优势在于Loss可以比较稳定的下降,缺点是整体速度的快慢取决于最慢的trainer. 因此在训练较为复杂的模型时,即模型训练过程中神经网络训练耗时远大于节点间通信耗时的场景下,推荐使用同步训练模式。
Fleet中 PServer模式使用 gRPC 通信,Collective模式使用 NCCL2 通信。
下文将由三部分组成:
介绍 Fleet 同步训练中常用的几个策略 、优化
结合上述常用优化,给出一个在 4节点 32 V100 集群 训练 ResNet50的示例代码
完整 Fleet 同步训练参数策略介绍
Fleet Collective 同步训练优化¶
Fleet 支持在 GPU (CUDA 版本 >= 7.5) 服务器集群上完成高性能分布式训练。
用户可以通过 fleet.DistributedStrategy
设置许多与训练性能策略相关参数。目前Fleet
为这些参数提供了一个较通用默认值,用户可以不去调整。但如果用户希望针对性调优分布式训练的性能,可以根据自身硬件和任务设置对应参数。
在进行性能优化时, 检查每项优化点并验证对应提升,最终获得最优性能。 一个简单的验证当前的训练程序是否需要进一步优化性能的方法, 是查看GPU的计算利用率,通常用 nvidia-smi 命令查看。 如果GPU利用率较低,则可能存在较大的优化空间。
下文将介绍对性能影响较大,设置频率比较高的几个参数,详细的参数列表放在文末的附录中。
注意: 使用NCCL2模式分布式训练时,需要确保每个节点训练等量的数据,防止在最后一轮训练中任务不退出。通常有两种方式:
随机采样一些数据,补全分配到较少数据的节点上。(推荐使用这种方法,以训练完整的数据集)。
在python代码中,每个节点每个pass只训练固定的batch数,如果这个节点数据较多,则不训练这些多出来的数据。
OP融合¶
将模型网络中顺序执行的多个OPs进行融合能够减少OP 调度的开销,提升训练速度。目前Fleet 中支持如下3种的OP 融合:
fuse_all_optimizer_ops
:表明是否融合(fuse) 是否融合 optimizer_op,仅对部分 optimizer 可用(SGD、Adam和Momentum)。fuse_elewise_add_act_ops
:表明是否融合(fuse) elementwise_add_op和activation_op。fuse_bn_act_ops
:表明是否融合(fuse) batch_norm_op 和 activation_op。
通常使用这些策略都会使整体执行过程更快。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.fuse_all_optimizer_ops = True
dist_strategy.fuse_bn_act_ops = True
dist_strategy.fuse_elewise_add_act_ops = True
AllReduce融合¶
AllReduce 融合默认情况下会将同一layer中参数的梯度的多个AllReduce操作合并成一个。 比如对于 fluid.layers.fc 中有Weight和Bias两个参数,打开该选项之前,需要两次AllReduce操作;打开该选项之后,只用一次AllReduce 操作。这样可以减少梯度同步时的通信耗时。
此外,为支持更大粒度的参数梯度融合,Fleet 提供了以下两个选项,用户可以在训练程序运行前在DistributedStrategy中设置:
fuse_grad_size_in_MB
: 指定每个AllReduce操作的梯度字节数,如该参数等于16 则每次AllReduce调用传输16MB的梯度。 该参数的经验值为总通信量的十分之一。fuse_grad_size_in_TFLOPS
: 指定每次AllReduce操作的最大层数,即到达该层数就进行AllReduce。如该参数等于50, 则最多每50层做一次 fused AllReduce。
注意: AllReduce融合目前不支持sparse参数梯度。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.fuse_grad_size_in_MB=16
dist_strategy.fuse_grad_size_in_TFLOPS=50
dist_strategy.fuse_all_reduce_ops=True
分层 AllReduce¶
对于多机模式,针对小数据量的通信,Ring AllReduce通信效率低,采用Hierarchical AllReduce可以缓解这一问题。 分层AllReduce 运行如下图所示:

dist_strategy = fleet.DistributedStrategy()
dist_strategy.use_hierarchical_allreduce = True
dist_strategy.hierarchical_allreduce_inter_nranks = 8
使用同步Allreduce¶
Fleet 使用多进程+NCCL2模式(collective)以获得更好的性能。 在多进程模式下,每台服务器的每个GPU卡都会对应启动一个训练进程, 集群中的所有进程之间会互相通信完成训练。以此方式最大限度的降低进程内部资源抢占的开销。
dist_strategy.sync_nccl_allreduce=True
设置合适的nccl通信器数量¶
nccl通信器数量 nccl_comm_num 可以加快GPU之间的通信效率,建议单机设置为1,多机设置为2。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.nccl_comm_num = 2
设置合适的CPU线程数¶
PaddlePaddle Fluid使用“线程池” [5] 模型调度并执行Op,Op在启动GPU计算之前, 通常需要CPU的协助,然而如果Op本身占用时间很小,“线程池”模型下又会带来额外的调度开销。
根据以往的经验,对于CPU任务,num_threads=2 * dev_count 时性能较好,对于GPU任务,num_threads=4 * dev_count 时性能较好。注意:线程池不是越大越好。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.thread_num = 3
提高网络的吞吐¶
多节点训练时网络的带宽常常成为训练的瓶颈。我们在实测中发现,当使用自动混合精度训练后,TCP
socket 的通信方式将成为训练速度的瓶颈, 使多节点训练无法充分利用 FLeet
混合精度计算带来的速度提升。 在我们实测中使用: 100Gb
网卡,RDMA
[7]
和
InfiniBand
[8]来提升网络带宽,使网络传输不会成为计算速度的瓶颈。
在开始训练前,需要正确设置以下 NCCL 环境变量使对应硬件设置生效:
Env Name |
Description |
---|---|
NCCL_SOCKET_IFNAME |
The RDMA device, e.g. eth2 |
NCCL_P2P_DISABLE |
Set to 1 to disable P2P transfer between GPUs |
NCCL_IB_DISABLE |
Set to 1 to disable using RDMA |
NCCL_IB_CUDA_SUPPORT |
Set to 1 to enable GPU Direct if supported |
NCCL_DEBUG |
Set debug level: VERSION, WARN, INFO |
预先分配足够的显存¶
通过环境变量 FLAGS_fraction_of_gpu_memory_to_use=0.7 设置预先分配的显存占比。 由于CUDA原生的显存分配cuMalloc和释放cuFree操作均是同步操作,非常耗时,因此 通过 设置 FLAGS_fraction_of_gpu_memory_to_use 成一个较大的值,比如0.7,可以显著地加速训练的速度。
0.7 是指 70%的显存会预先分配。设置的范围是0.0~1.0。
os.environ['FLAGS_fraction_of_gpu_memory_to_use'] = "0.98"
降低scope drop频率和fetch频率¶
减少scope drop和fetch频率,可以减少频繁的变量内存申请、释放和拷贝, 从而提升性能。
# 每 30 batch 之后清理一次临时变量
dist_strategy = fleet.DistributedStrategy()
dist_strategy.BuildStrategy = {'num_iteration_per_drop_scope': 30}
# 降低fetch频率,每 30 batch fetch 一次训练输出
for pass_id in xrange(PASS_NUM):
batch_id = 0
while True:
if batch_id % 30 == 0:
fetched = exe.run(fetch_list)
else:
exe.run([])
增大batch_size¶
分布式同步训练,跨节点通信或多或少会带来性能影响,增大训练的batch_size, 可以保持通信开销不变的情况下,增大计算吞吐从而降低通信在整个训练过程中的占比来提升总体的训练吞吐。
使用 DALI reader¶
数据读取的优化在GPU训练中至关重要,尤其在不断增加batch_size提升吞吐时,数据reader 可能成为训练速度的瓶颈。 Fleet 中可以使用 Nvidia DALI6 作为数据loader. 使用DALI的优点有:
使用GPU完成部分数据预处理,加速数据读取过程,减少 CPU 负担。
DALI 提供预取队列(perfetch queue)功能,让数据预处理和模型计算可以异步进行,减少模型计算对数据读取的等待。
import fleetx as X
model = X.applications.Resnet50()
loader = model.load_imagenet_from_file("/pathto/imagenet/train.txt", use_dali=True)
使用混合精度训练¶
V100 GPU提供了 Tensor Core 可以在混合精度计算 场景极大的提升性能。使用混合精度计算的例子可以参考文档 ` <https://todo/>`__
目前Paddle只提供在两个模型(ResNet, BERT)的混合精度计算实现并支持static loss scaling,其他模型使用混合精度也 可以参考以上的实现完成验证。
ResNet50训练示例¶
试验开始前我们已经在GPU 集群中提前配置好 RDMA 和 InfiniBand,减少网络通信的瓶颈,配置细节和具体硬件相关,可以参考`[rdma-x] <https://community.mellanox.com/s/article/what-is-rdma-x>`__
设置 AllReduce融合等参数¶
梯度融合中的16 和 50 是我们根据自身网络硬件和ResNet50 训练试验得出的经验值,用户可以根据自身硬件和模型进行调整。 0.7 是为了给 DALI loader 提前预留显存空间。
import os
os.environ['FLAGS_fuse_parameter_memory_size'] = "16"
os.environ['FLAGS_fuse_parameter_groups_size'] = "50"
os.environ['FLAGS_fraction_of_gpu_memory_to_use'] = "0.7"
添加依赖¶
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化模型和reader¶
这里我们使用DALI reader 减少CPU 数据处理负担和数据读取瓶颈。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
定义分布式相关策略¶
这里我们会开启上文中提到的各项训练优化策略,如:自动混合精度计算,OP 融合等。
dist_strategy = fleet.DistributedStrategy()
# distributed strategy
dist_strategy.sync_nccl_allreduce = True
dist_strategy.nccl_comm_num = 2
dist_strategy.fuse_all_reduce_ops = True
# build strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_sequential_execution = True
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
build_strategy.enable_auto_fusion = True
build_strategy.fuse_all_optimizer_ops = True
dist_strategy.build_strategy = build_strategy
# execute strategy
execution_strategy = fluid.ExecutionStrategy()
execution_strategy.num_threads = 3
execution_strategy.num_iteration_per_drop_scope = 100
execution_strategy.num_iteration_per_run = 1
dist_strategy.execution_strategy = execution_strategy
# amp
dist_strategy.amp = True
dist_strategy.amp_configs = {
"init_loss_scaling": 128,
"decr_every_n_nan_or_inf": 2,
"incr_every_n_steps": 1000,
"incr_ratio": 2.0,
"use_dynamic_loss_scaling": True,
"decr_ratio": 0.5,
"custom_white_list": [],
"custom_black_list": [],
}
dist_strategy.save_to_prototxt("dist_strategy.prototxt")
开始训练¶
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
Fleetrun 一键启动¶
"xx.xx.xx.xx” 等是四个节点ips,每个节点 8 张 GPU卡, 共 32 GPU 并行训练。
fleetrun --ips="xx.xx.xx.xx, yy.yy.yy.yy, aa.aa.aa.aa, bb.bb.bb.bb" --gpus=0,1,2,3,4,5,6,7 example_collective.py
# worker_index: 0, step0 cost = 7.147776, speed: 34.481360
# worker_index: 0, step1 cost = 7.151375, speed: 408.405991
# worker_index: 0, step2 cost = 7.025396, speed: 509.624355
# worker_index: 0, step3 cost = 6.501647, speed: 533.641315
# worker_index: 0, step4 cost = 6.759287, speed: 520.999193
# worker_index: 0, step5 cost = 6.266363, speed: 536.729215
# worker_index: 0, step6 cost = 6.243353, speed: 522.510241
# worker_index: 0, step7 cost = 6.923586, speed: 519.478763
# worker_index: 0, step8 cost = 7.607512, speed: 534.919526
# worker_index: 0, step9 cost = 7.111218, speed: 508.371600
Fleet 训练策略¶
DistributedStrategy¶
Dist ributedStrategy |
类型 |
默认值 |
定义 |
---|---|---|---|
auto |
bool |
False |
自动 化框架参数优化 |
a_sync |
bool |
True |
指示 是否使用异步SGD 进行参 数更新,仅在PS erver模式中生效 |
sync _nccl_allreduce |
bool |
True |
指示是 否在每个通信线 程中中使用同步 allre duce,仅在Colle ctive模式中生效 ,通常在使用同 步allreduce后系 统的开销会降低 |
nccl_comm_num |
int |
1 |
nccl通信器数量. nccl通信器数量 nccl_comm_num 可以加快GPU之 间的通信效率, 建议单机设置为 1,多机设置为2 。针对CPU线程数 num_threads ,建议单机设置 为1,多机设置为 nccl_comm_num +1 |
use_hierarc hical_allreduce |
bool |
False |
分级式allred uce,对于多机模 式,针对小数据 量的通信,Ring AllReduc e通信效率低,采 用Hierarchical AllReduce可 以解决该问题。 |
hiera rchical_allredu ce_inter_nranks |
int |
1 |
在 分级式allreduc e,低层级groups 中的 r ank数。一般等于 单个GPU节点中的 GPU数 |
sync_batch_norm |
bool |
False |
表示是否使 用同步的批正则 化,即在训练阶 段通过多个设备 同步均值和方差 。当前的实现不 支持FP16训练和C PU。并且目前* 仅支持*仅在 一台机器上进行 同步式批正则。 |
fuse _all_reduce_ops |
bool |
True |
默认情况下会 将同一layer中参 数的梯度的AllR educe操作合并成 一个,比如对于 fluid.layers.fc 中有 Weight和Bias两 个参数,打开该 选项之后,原本 需要两次AllRed uce操作,现在只 用一次AllReduce 操作。 |
fuse_ grad_size_in_MB |
int |
32 |
每个AllReduce操 作的梯度字节数 |
fuse_grad _size_in_TFLOPS |
int |
20 |
指 定每次AllReduc e操作的最大层数 ,即到达该层数 就进行AllReduce |
cudnn_ex haustive_search |
bool |
True |
表示是 否使用穷举搜索 方法来选择卷积 算法。在cuDNN中 有两种搜索方法 ,启发式搜索和 穷举搜索。穷举 搜索尝试所有cu DNN算法以选择其 中最快的算法。 此方法非常耗时 ,所选择的算法 将针对给定的层 规格进行缓存。 一旦更改了 图层规格(如bat ch大小,feature map大小), 它将再次搜索。 |
conv_works pace_size_limit |
int |
4000 |
用 于选择cuDNN卷积 算法的工作区限 制大小(单位为 MB)。cuDNN的内 部函数在这个内 存限制范围内获 得速度最快的匹 配算法。通常, 在较大的工作区 内可以选择更快 的算法,但同时 也会显著增加内 存空间。用户需 要在内存和速度 之间进行权衡。 |
cudn n_batchnorm_spa tial_persistent |
bool |
True |
表示是否在 batchnorm中使用 新的批量标准化 模式CUDNN_BATC HNORM_SPATIAL_P ERSISTENT函数。 |
BuildStrategy¶
BuildStrategy |
类型 |
默认值 |
定义 |
---|---|---|---|
enable_seque ntial_execution |
bool |
False |
如果 设置为True,则 算子的执行顺序 将与算子定义的 执行顺序相同。 |
fuse_elew ise_add_act_ops |
bool |
False |
表明 是否融合(fuse) elementwise_add _op和activation _op。这会使整体 执行过程更快。 |
fuse_bn_act_ops |
bool |
False |
表明 是否融合(fuse) batch_norm_op 和 activation _op。这会使整体 执行过程更快。 |
fuse_relu _depthwise_conv |
bool |
False |
表明 是否融合(fuse) relu和 depthwise_conv 2d,节省GPU内存 并可能加速执行 过程。此选项仅 适用于GPU设备。 |
fus e_broadcast_ops |
bool |
False |
表明 是否融合(fuse) broadcast ops。 该选项指在Reduc e模式下有效,使 程序运行更快。 |
fuse_al l_optimizer_ops |
bool |
False |
表明 是否融合(fuse) 是否融合 optimiz er_op,仅对部分 optimizer 可用 (SGD、Adam和M omentum),可使 程序运行更快。 |
enable_inplace |
bool |
False |
表明是 否Op的输出复用O p输入的显存空间 ,优化显存占用 |
ena ble_backward_op timizer_op_deps |
bool |
True |
在反向操作 和参数更新操作 之间添加依赖, 保证在所有的反 向操作都运行结 束之后才开始运 行参数更新操作. 在 多卡训练时,打 开该选项可能会 提升训练速度。 |
cache_ runtime_context |
bool |
False |
unkown |
ExecutionStrategy¶
Ex ecutionStrategy |
类型 |
默认值 |
定义 |
---|---|---|---|
num_threads |
int |
1 |
表示当前 Executor 的线程池(thread pool)的大小, 此线 程池可用来并发 执行program中的 operator(算子 ,运算)。如果 num_threads=1 ,则所有 的operator将一 个接一个地执行 ,但在不同的pro gram重复周期(it erations)中执行 顺序可能不同。 |
num_iteration _per_drop_scope |
int |
10 |
该选项表 示间隔多少次迭 代之后清理一次 临时变量。模型 运行过程中,生 成的中间临时变 量将被放到local execution scope中,为了 避免对临时变量 频繁的申请与释 放,通常将其设 为较大的值(比 如10或者100)。 |
num_it eration_per_run |
int |
3 |
它配置了当用户 在python脚本中 调用pe.run()时 执行器会执行的 迭代次数。Execu tor每次调用,会 进行num_iterat ion_per_run次训 练,它会使整体 执行过程更快。 |
use _thread_barrier |
bool |
False |
当使用 PServer 模式时为 True |
自动混合精度练加速分布式训练¶
简介¶
在使用数据并行分布式训练的同时, 我们还可以引入自动混合精度(Auto Mixed Precision) 来进一步提升训练的速度.
主流的神经网络模型通常使用单精度 single-precision
(FP32)
数据格式来存储模型参数、进行训练和预测. 在上述环节中使用半精度
half-precision
(FP16)
来代替单精度. 可以带来以下好处:
减少对GPU memory 的需求: GPU 显存不变情况下, 支持更大模型 / batch size
降低显存读写时的带宽压力
加速GPU 数学运算速度 (需要GPU 支持[1])
GPU上 FP16 吞吐是FP32 的 2 - 8 倍[2]
Paddle 支持自动混合精度计算, 并实现了 自动维护FP32 、FP16参数副本
,
Dynamic loss scaling
, op黑白名单
等策略来避免
因 FP16 动态范围较小而带来的模型最终精度损失。 Fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码
就可将自动混合精度应用到原有的分布式训练中进一步提升训练速度.
下文将通过一个简单例子介绍如如何通过 Fleet将实现混合精度的分布式训练, 另外给出我们使用 Fleet 进行同步训练加速的实践。
AMP 快速开始¶
这里以在单机多卡上训练Resent50 为简单例子介绍Fleet 中 AMP的用法.
自动混合精度原理¶
FP32 参数副本及更新¶

如上图所示, 在AMP 中, 模型参数 weight
,
前向中间的结果activation
, 反向的gradient
都以FP16 形式存储,
由此可以减少模型占用的显存空间,同时提高计算和通信速度,也就是使得训练吞吐更大,训练更快.
Paddle框架会为每一个weight
维护一个FP32副本, 用于参数更新.
Loss scaling¶

如上图所示, 实际情况中模型训练中的某些变量, 比如grad
(特别是
activation
的 grad
), 可能会因小于 FP16的精度低而变成0
;
另一方面在FP16 的表示范围的中有很大的一部分(从最大值往左) 却没有被利用到.
对gradient 做一个整体的放大, 能够更充分的利用FP16 的表示范围.
Fleet AMP 会在反向开始前对 loss 进行 up scaling, 并在执行任何梯度相关操作(e.g. gradient-clip, update) 之前对 gredient 进行 down scaling 恢复原来的大小.
scaling factor
的设置是 Lossing scaling 的关键, Fleet AMP 提供
Dynamic loss scaling
(默认) 和 Constant loss scaling
两种scaling 策略:
Constant loss scaling: 设置
use_dynamic_loss_scaling = False
和init_loss_scaling (float)
Dynamic loss scaling: scaling 中面临的问题是当
scaling up 不足
时, 仍会有部分较小变量会被表示成 0而损失精度; 当scaling up 过度
时, 变量超过FP16表示范围出现 nan or inf, 同样造成精度损失. 此策略采用自动 gradient 值检测的方式:当连续
incr_every_n_steps(int)
个batch 中所有的gradient 都在FP16 的表示范围, 将scaling factor 增大incr_ratio(float)
倍;当有连续
decr_every_n_nan_or_inf(int)
个batch 中gradient 里出现 nan / inf时, scaling factor 缩小decr_ratio(float)
倍.上述四个参数Fleet 提供的默认值可以满足绝大部分要求, 用户通常不需要修改.
如下图所示在 Dynamic loss scaling 中,框架在每一个 iteration
都会依据当前 gradients 是否出现 nan
or inf
还有用户设置的
Dynamic loss scaling 参数来动态调整 loss scaling factor
的大小,将gradient 尽量保持在 FP16 的表示范围之内。

OP 黑白名单¶
模型中的某些Operation (OP)
可能对精度较为敏感, 为了确保AMP
中精度无损, 可以通过OP 黑白名单
对具体OP 操作的精度做指定.
白名单: OP 操作在FP16精度下进行,
input
: 如果不是FP16 会被首先cast 成FP16后再输入OP.output
: FP16黑名单: OP 操作在FP32精度下进行,
input
: 如果不是FP32 会被首先cast 成FP32后再输入OP.output
: FP32灰名单: 所有不在黑或白名单里的OP. 仅当OP 所有 inputs 都是 FP16精度时, 操作才在FP16精度下进行, 否着以FP 32进行.
input / output
: 和原始输入中的最高精度相同
Fleet 已经预设了一个能够覆盖绝大多数模型OPs的黑白名单,
通常情况下用户并不需要修改, 但是如果任务对精度有特殊要求,
或者希望新增自定义 OP, 用户可以通过
paddle.distributed.fleet.DistributedStrategy.amp_configs 中的
custom_white_list
和 custom_black_list
进行指定. 同是,
用户还可以通过custom_black_varnames
,
来具体指定Paddle program
某一个 var
必须使用FP32精度.
我们将在文末的 appendix中 进一步介绍 Fleet 的黑白名单设置及其影响。
开始训练¶
添加依赖¶
首先我们要导入依赖和定义模型和 data loader, 这一步和Fleet 下其他任务基本一致.
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化¶
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据¶
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
定义分布式及AMP 相关策略¶
如上文描述, 用户可以选择设置 Loss scaling
和
OP黑白名单
等的参数.
另外 Fleet 将AMP 实现为 meta optimizer, 用户需要指定其的
inner-optimizer
. Fleet AMP支持所有 paddle optimziers 和 FLeet meta
otpimizers 作为其 inner-optimizer.
dist_strategy.amp = True
dist_strategy.amp_configs = {
"init_loss_scaling": 32768,
"decr_every_n_nan_or_inf": 2,
"incr_every_n_steps": 1000,
"incr_ratio": 2.0,
"use_dynamic_loss_scaling": True,
"decr_ratio": 0.5,
"custom_white_list": [],
"custom_black_list": [],
}
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
这一部分和Fleet 中其他任务基本相同:
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本¶
一行启动单机多卡分布式训练:
fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log example_amp.py
# worker_index: 0, step0 cost = 6.895311, speed: 12.192901
# worker_index: 0, step1 cost = 6.964077, speed: 412.116618
# worker_index: 0, step2 cost = 7.049311, speed: 433.850506
# worker_index: 0, step3 cost = 7.006689, speed: 358.400410
# worker_index: 0, step4 cost = 7.000206, speed: 398.210745
# worker_index: 0, step5 cost = 7.088611, speed: 462.322357
# worker_index: 0, step6 cost = 7.022367, speed: 425.185013
Fleet 黑白名单设置¶
上文简要介绍了Fleet 中黑白名单的 API 接口, 下文将进一步介绍 Fleet 中黑白名单的实现和可能对训练造成影响。 目前 Fleet 中 AMP 的默认黑白名单如下, 其他未列出的 op 都属于灰名单:
white_list = {
'conv2d',
'matmul',
'mul',
}
black_list = {
'exp',
'square',
'log',
'mean',
'sum',
'cos_sim',
'softmax',
'softmax_with_cross_entropy',
'sigmoid_cross_entropy_with_logits',
'cross_entropy',
'cross_entropy2',
}
黑白名单设置¶
白名单中只有卷积和乘法运算,这样的设置能够满足大部分的 CV 场景的模型加速(Vgg、ResNet), 因为卷积计算占据这些模型计算和内存访问开销的很大一部分, 其他 ops 的开销只占很小一部分。 对于 主要开销在 RNN 计算的 NLP 模型,目前的 AMP 实现提速并不是很明显。
黑名单中的 op 可以分为3 大类: * 对精度非常敏感的 op:
softmax
,cross_entropy
等。 *
输出相对于输入有更大动态范围的op(f(x) >>
x):exp
,square
, log
等。 * reduce 类型的op:
mean
,sum
等。
所以,用户希望判断新的自定义op是否需要加入黑名单时,可以参考上述3个类型。
需要注意: 一些常用的 op 如 BatchNorm
, pooling
, relu
属于灰名单,这意味着这些 op 的数据类型决定于之前的 op 的类型;
另外并行分布式计算使用 AMP之后,gradient-allreduce 是在FP16 中进行的。
自动化op 插入¶
在训练开始前,框架会根据黑白名单在前向和反向网络自动插入 cast op, 如: * 前向中插入 FP32toFP16 cast, 将 FP32 的layer parameter 副本 cast 成 FP16, 进行 FP16 conv 计算。 * 反向中插入 FP16toFP32 cast, 将等到的 FP16 gradient cast 成 FP32, 然后更新 FP32 的parameter 副本。
cast op 虽然会带来额外的开销, 但是在诸如 Vgg、ResNet 等主要由重复的 conv layer 串行的而成 CV 模型中, 只需要cast input 和 每一层的param,并不需要cast 模型的中间结果,这样 cast 操作带来的开销较少, 容易倍半精度计算带来的加速覆盖;但是如果模型的串行 layers 序列中存在较多的黑名单 op(e.g. conv --> log --> conv --> square --> conv), 这样模型的中间结果需要进行多次 FP32toFP16 和 FP16toFP32 cast, cast 开销将会急剧增大,从而抵消半精度带来的加速。
可能不适用 AMP 加速的情况¶
RNN 为主的 NLP 模型
模型组网中有较多黑名单 op 的模型
对数据精度敏感的任务(Adversarial Attacking in ML)
图像 Input Layout 格式¶
CV 模型训练时了达到最佳速度,不同场景下推荐使用不同图像 Layout:
FP32:
NCHW
自动混合精度:
NHWC
# when build dataloader
loader = model.load_imagenet_from_file("./ImageNet/train.txt",
batch_size=args.batch_size,
data_layout="NHWC")
# when build model
if data_format == "NHWC":
img_shape = [None, 224, 224, 3]
else:
img_shape = [None, 3, 224, 224]
image = fluid.data( name="feed_image", shape=img_shape, dtype="float32", lod_level=0)
conv = fluid.layers.conv2d(input=input, data_format= "NHWC")
推荐阅读:¶
如果需要对自动混合精度做定制化修改,或更深入理解AMP中原理和实现推荐阅读:
使用超大Batch进行训练¶
简介 + strategy列表¶
为了追求模型的性能不断提升,人们对更大规模的数据集、更深的网络层、更庞大的参数规模应运而生。但是随之而来的就是给模型训练带来了巨大的压力,因此分布式技术及定制化AI芯片应运而生。但在分布式训练中,经常会遇到显存或者内存不足的情况,通常是以下几点原因导致的:
输入的数据过大,例如视频类训练数据。
深度模型的参数过多或过大,所需的存储空间超出了内存/显存的大小。
AI芯片的内存有限。
为了能正常完成训练,我们通常只能使用较小的Batch Size以降低模型训练中的所需要的存储空间,这将导致很多模型无法通过提高训练时的Batch Size来提高模型的精度。为了解决这个问题,Fleet中提供了两种策略,使得模型可以使用超大Batch的方式完成训练:
Forward Recomputation Backpropagation(FRB): 通过清除正向计算过程中的中间计算结果,来降低训练过程中使用的存储空间,从而确保硬件有足够的内存做更大Batch Size的训练。
Gradient Merge: 在训练过程中,将连续多个Batch数据训练得到的梯度合并更新模型参数的策略。在该训练策略下,虽然从形式上看依然是小Batch规模的数据在训练,但是效果上可以达到多个小Batch数据合并成大Batch后训练的效果。
原理¶
Forward Recomputation Backpropagation¶
我们知道,深度学习网络的一次训练迭代包含三个步骤:
前向计算: 运行前向算子(Operator) 来计算中间隐层(Variable)的值 。
反向计算: 运行反向算子来计算参数(Parameter)的梯度。
优化: 应用优化算法以更新参数值 。
在前向计算过程中,前向算子会计算出大量的中间结果,由于这些中间结果是训练数据和算子计算得到的,所以训练数据的Batch Size越大,中间结果占用的内存也就越大。飞桨核心框架会使用 Variable来存储这些隐层的中间结果。当模型层数加深时,其中间结果的数量可达成千上万个, 占据大量的内存。虽然飞桨核心框架的显存回收机制会及时清除无用的中间结果,以节省存储。 但是有些中间结果是反向计算过程中算子的输入,这些中间结果必须存储在内存中,直到相应的反向算子计算完毕。
对于大小固定的内存来说,如果用户希望使用大Batch Size的数据进行训练,则将导致单个中间结果占用内存增大,那么就需要减少中间结果的存储数量,FRB就是基于这种思想设计的。
FRB是将深度学习网络切分为k个部分(segments)。对每个segment而言:前向计算时,除了小部分必须存储在内存中的Variable外,其他中间结果都将被删除;在反向计算中,首先重新计算一遍前向算子,以获得中间结果,再运行反向算子。简而言之,FRB和普通的网络迭代相比,多计算了一遍前向算子。
我们把切分网络的变量叫做checkpoints。 那么问题来了,如何选择checkpoints呢?自从FRB方法提出以来,大量学者在研究这一关键问题。 我们知道深度学习网络通常是由一个个模块串联得到的,比如ResNet-50由16个block串联而成, Bert-Large由24个transformer串联而成,以两个子模块中间的变量作为切分点就是一个很好的选择。 对于非串联的网络(比如含有大量shortcut结构的网络),FRB也支持对其做切分, 只是可能多耗费一点内存(用于存储shortcut的Variable)。
Gradient Merge¶
与FRB相比,Gradient Merge并没有像FRB那样对内存的使用做出大刀阔斧般的改动,只是在训练流程上做了一些微调,达到模拟出大Batch Size训练效果的目的。具体来说,就是使用若干原有大小的Batch数据进行训练,即通过“前向+反向” 网络计算得到梯度。其间会有一部分显存/内存用于存放梯度,然后对每个Batch计算出的梯度进行叠加,在计算完所有Batch后,使用累加的梯度对模型进行参数更新,从而达到使用大Batch数据训练的效果。
GradientMerge 策略在使用方面也很简单,用户只需要定义将多少Batch的数据计算出的梯度叠加更新模型参数,便可以实现大Batch训练的目的。
操作实践¶
该章节中我们将基于BERT模型的实用样例,分别对这两个增大Batch的策略进行讲解。从整体来看,训练脚本的编写主要分为4个部分:
添加训练脚本运行所必须的依赖包。
定义分布式模式并初始化。
加载模型及数据。
定义训练策略和优化器,在这一步我们可以选择使用FRB或者Gradient Merge策略来增大BatchSize。
下面我们来分别介绍FRB和Gradient Merge两种策略所对应脚本的编写方法(bert_recompute.py 及 bert_gradient_merge.py)。
Forward Recomputation Backpropagation¶
添加依赖¶
首先我们需要添加训练中所用到的python模块,fleetx
可以用于加载我们为用户封装的接口如:加载模型及数据,模型训练等。paddle.distributed.fleet
中定义了丰富的分布式策略供用户使用。
# -*- coding: UTF-8 -*-
import paddle
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
定义分布式模式并初始化¶
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模型,下面代码中的is_collective=True
表示采用集合通信的GPU分布式模式训练模型。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据¶
用户可以通过X.applications
接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。下面的例子中,我们使用了recompute对Bert_large模型所支持的最大Batch
Size(130)来进行训练。
与此同时,用户可以使用我们的`Downloader`接口下载预先保存的Wiki数据集。
model = X.applications.BertLarge()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/small_datasets/yaml_example/wiki_cn.yaml',
local_path='./data')
data_loader = model.get_train_dataloader(
local_path,
max_seq_len=512,
batch_size=130,
)
定义Recompute Strategy 及 Optimizer¶
接下来我们就可以定义分布式训练中所应用到的策略了。下面的例子中,为了使用Recompute策略,我们将dist_strategy.recompute
设置为True
并设置我们事先定义好的checkpoints。
接下来用户需要定义训练中更新模型所用到的优化器,并使用fleet.distributed_optimizer
接口将优化器转换为分布式模式。
最后运行optimizer.minimize(model.loss)
将反向计算的算子插入训练网络,我们就可以开始训练了。
dist_strategy = fleet.DistributedStrategy()
# 使用Recompute,并设置checkpoints
dist_strategy.recompute = True
dist_strategy.recompute_configs = {"checkpoints": model.checkpoints}
optimizer = fluid.optimizer.Adam(learning_rate=configs.lr)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
在 FleetX 中,我们为用户提供了X.MultiGPUTrainer
接口,用于GPU分布式训练。其中model
及 data_loader
分别为第二步中加载的模型及数据。start_step
表示开始打印训练log的步数,若用户想复现我们的模型训练速度数据建议设置成10或者更大的数;若用户想查看模型的收敛情况,则可设置成0。
trainer = X.MultiGPUTrainer()
trainer.fit(model, data_loader, epoch=10)
效果测试¶
我们在BERT模型上对recompute的效果进行了测试,使用Recompute后Batch size可以扩大9倍多。与混合精度一起使用时,Batch_size可以进一步扩大。其中,速度记录的是分布式训练任务每秒可以训练的样本数。
Model |
Baseline |
Recompute |
Recompute + mixed precision |
---|---|---|---|
Batch size |
14 |
130 |
145 |
speed |
69.92 sents/s |
45.76 sents/s |
75.84 sents/s |
Gradient Merge¶
下面,我们介绍如何使用 Gradient Merge 来扩大BERT模型分布式训练中的 Batch Size(假设脚本名称为bert_gradient_merge.py):
与 Forward Recompute Backpropagation 相同,我们首先要添加依赖,定义分布式模式并加载模型及数据。
添加依赖¶
# -*- coding: UTF-8 -*-
import paddle
import fleetx as X
import paddle.fluid
import paddle.distributed.fleet as fleet
定义分布式模式并初始化¶
加载模型及数据¶
model = X.applications.Bert_large()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/small_datasets/yaml_example/wiki_cn.yaml',
local_path='./data')
data_loader = model.(
local_path,
max_seq_len=512,
batch_size=13,
)
定义Gradient Merge Strategy 及 Optimizer¶
在上面的代码中,我们定义了Batch
Size为13,在这一步中,通过设置k_steps
,使用4个Batch
Size来模拟一个大Batch的训练,从而达到了Batch size为52的训练效果。
在gradient_merge_configs
中,avg选项用于控制梯度累计的形式:当被设置为
True
时,会对每次的梯度求和并做平均;反之将直接对梯度求和,并对参数进行更新。
dist_strategy = fleet.DistributedStrategy()
# 使用Gradient merge策略并设置相关参数
dist_strategy.gradient_merge = True
dist_strategy.gradient_merge_configs = {"k_steps": 4, "avg": True}
optimizer = fluid.optimizer.Adam(learning_rate=configs.lr)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
Gradient Merge 的训练代码与 Recompute 策略相同,用户使用两行代码即可开始训练:
trainer = X.MultiGPUTrainer()
trainer.fit(model, data_loader, start_step=10)
运行训练脚本¶
fleetrun --gpus 0,1,2,3,4,5,6,7 bert_gradient_merge.py
使用LARS / LAMB 优化分布式超大batch 训练¶
简介¶
在数据并行分布式训练场景中, 常使用增加GPU数量的方式来加速训练. 为了保证GPU的算力得到充分利用, 每张GPU卡上的batch size都需要足够大. 因此在增加GPU 数量同时, 训练的全局batch size 也会变大.
但越大的全局batch size 会带来训练的收敛问题[1] [2]:
模型最终精度损失
收敛速度变慢, 需要更多的epoch 才能收敛
LARS[3] 和 LAMB[4] 两个优化策略常用来解决上述超大batch 训练中的收敛问题.
Paddle 实现了这两种优化策略,paddle.distributed.fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码 就可将策略加入到原有的训练中。 通过这两个优化策略, 我们在超大batch 场景中实现了更快的收敛速度和无损的精度, 结合Fleet 中其他的策略(e.g. AMP) 可以极大缩短的训练整体的time2train.
试验效果¶
resnet50 imagenet |
Global batch size |
epoch |
top1 |
---|---|---|---|
[Goyal et al] |
8k |
90 |
76.3% |
LARS Paper |
32k |
90 |
72.3% |
[fleet: lars + amp] |
16k |
60 |
76.2% |
[fleet: lars + amp] |
32k |
62 |
75.9% |
LARS¶
我们以在单机多卡上Resent50 训练为简单例子介绍fleet 中 LARS的用法。
import os
import fleetx as X
import paddle
paddle.enable_staic()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模型,下面代码中的is_collective=True
表示采用集合通信的GPU分布式模式训练模型。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
用户可以通过X.applications
接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
LARS 优化算法的公式如下:
可以看到LARS 其实是在 带weight decay
的momentum
优化器的基础上加入了local learning rate
的逻辑,
对每一层的learning rate
进行了放缩. fleet 将 LARS实现为一个 fleet
meta optimizer, 在使用时需要设置一下几点:
LARS meta optimizer 的 inner optimizer 必须为
momentum
, 并在 momentum 中定义mu
和lr
参数.在 fleet dist_strategy 定义LARS 特有的
lars_coeff
参数和lars_weight_decay
参数.LARS 已经将
weight decay
包含进公式中, 用户不需要再在 optimizer中设置regularization
.fleet 中还提供 lars_weight_decay 过滤策略, 可以通过在
exclude_from_weight_decay
参数加入对应layer 的name string
, 让这一 layer 的参数不进行 lars weight decay. (通常我们将``BN`` 参数 和FC_bias
从lars weight decay 中过滤)
dist_strategy = fleet.DistributedStrategy()
dist_strategy.lars = True
dist_strategy.lars_configs = {
"lars_coeff": 0.001,
"lars_weight_decay": 0.0005,
"exclude_from_weight_decay": ['batch_norm', '.b_0']
}
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
这一部分和fleet 中其他任务基本相同:
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本¶
完成上述脚本的编写后,我们就可以使用以下命令一行启动单机多卡分布式训练:
fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log example_lars.py
LAMB¶
我们以在单机多卡上Bert 训练为简单例子介绍fleet 中LAMB 的用法.
import os
import fleetx as X
import paddle
paddle.enable_staic()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
这一步和上文中的LARS 一致。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
这一步和上文中的LARS 一致。
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
LAMB 优化算法的公式如下:
和LARS 类似, LAMB 也是在内层优化器的基础上,
套了一个local learning rate
的逻辑, 对每一层的learning rate
进行了放缩. fleet 将 LAMB实现为一个 fleet meta optimizer,
在使用时需要设置一下几点:
LAMB meta optimizer 的 inner optimizer 必须为
adam
, 并在 adam 中定义 学习率lr
, 一阶 moment 的指数衰减率beta1
和二阶moment 的指数衰减率beta2
参数.在 fleet dist_strategy 定义LAMB 特有的
lamb_weight_decay
参数.LAMB 已经将
weight decay
包含进公式中, 用户不需要再在 optimizer中设置regularization
.fleet 中还提供 lamb_weight_decay 过滤策略, 可以通过在
exclude_from_weight_decay
参数加入对应layer 的name string
, 让这一 layer 的参数不进行 lars weight decay. (通常我们将``LN`` 从lamb weight decay 中过滤)
dist_strategy = fleet.DistributedStrategy()
dist_strategy.lamb = True
dist_strategy.lamb_configs = {
'lamb_weight_decay': 0.01,
'exclude_from_weight_decay': ['layer_norm'],
}
optimizer = paddle.optimizer.Adam(learning_rate=0.01, beta1=0.9, beta2=0.999)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
这一部分和fleet 中其他任务基本相同:
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本¶
完成上述脚本的编写后,我们就可以使用以下命令一行启动单机多卡分布式训练:
fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log resnet50_lamb.py
使用Fleet进行异构参数服务器训练¶
异构参数服务器目前仅支持在静态图下运行
什么是异构参数服务器?¶
在开始使用异构参数服务器
前,您需要先了解参数服务器的基本知识。我们先进行简单回顾:
参数服务器的应用领域以及解决的问题¶
参数服务器集中应用在NLP
、推荐
以及
搜索
领域,其主要针对以下两个问题:
大数据:
原始数据集庞大,动辄几百G的数据量,单机训练速度难以承受,需要依赖数据并行加速训练,以此解决大数据的问题。
大参数:
在上述场景中,数据量大的同时,伴随着特征的稀疏性,开发者们通常使用Embedding技术来将业务场景中的高维稀疏特征向量转化为低维的稠密特征向量。
在工业场景中,该Embedding参数的维度往往是亿级,占用的存储空间在上百GB,单机内存以及单卡显存无法承受完整的参数保存。但我们可以观察到,使用每个batch的数据进行训练时,并不会用到全部的稀疏参数,仅需与样本有关的部分参数,内存或显存可以承受。因此在这种场景下,开发者们通常使用参数服务器模式,将大参数分片放到各个
Server
上。Worker
训练模型时,仅请求当前batch数据需要用到的参数,以此解决大参数的问题。
传统参数服务器的局限¶
当前参数服务器的Worker
节点,通常使用CPU或GPU机器完成模型训练中的前向与反向部分。
当Worker
使用的设备确定,其硬件算力的配比也随之固定。固定的算力配比,在工业应用中,存在着以下问题:
GPU机器利用率较低
若训练的模型不复杂,如推荐领域常用的
DeepFM
、LR
,模型计算耗时并不高,而数据读取(IO)的性能决定了网络训练的速度,也同时限制了GPU机器的利用率,简单的将IO任务交给GPU机器上的CPU,或使用Nvidia安倍架构+DALI处理数据不能解决该问题。CPU机器算力有瓶颈
CPU机器通常核心数较多,并且机器价格也更便宜,可以充分利用CPU多核的优势,在简单模型上极大的提升数据吞吐,整体训练达到较好的性能。但是,随着深度学习模型的日渐复杂,在一些计算能力要求高的模型中,比如
BERT
,计算能力严重不足,模型计算耗时极高。新型算力接入成本较大
随着AI芯片发展日新月异,各种高算力低成本的芯片已进入工业实用化阶段。但是开发者们使用AI芯片的门槛依然较高,例如软件栈的改变,集群的迁移等,新硬件的接入成本较高。
异构参数服务器介绍¶
那么,可不可以动态调整机器配比?同时解决IO瓶颈以及算力瓶颈,并且快速支持新硬件的接入呢?
PaddlePaddle基于工业实践,创新性的提出了异构参数服务器,支持不同算力的芯片混合异构训练,如CPU、v100,p40,昆仑芯片,对不同算力的芯片高效利用,使得训练任务对设备不敏感,获得更高的加速效果。

异构参数服务器基本原理¶
一个深度学习模型的训练过程可以拆分为三步:1、前向计算Loss;2、反向计算梯度;3、利用梯度更新参数。
参数服务器模式下,前向及反向步骤在Worker
端(也称为Trainer
)完成,参数更新在Server
端完成。
异构参数服务器模式中,我们进一步拆分前向及反向,可以将embedding查表,输入数据的reshape等IO密集型的OP放置于CPU-Trainer
,将fc,attention等计算密集型的OP放置于Heter-Trainer
。
CPU-Trainer
和Heter-Trainer
之间会进行通信,交换网络运行所需要的上下文参数,两者协同完成前向和反向步骤,并将梯度发给Server
,完成参数的更新。

异构参数服务器使用方法¶
下面介绍异构参数服务器的使用方法,推荐先在正常参数服务器模式下运行成功,再开始调试异构参数服务器模式。下面介绍的使用方法,均为在正常参数服务器模式基础上的增量变动,请知晓。
以下示例的完整代码位于FleetX/example/heter_parameter_server/demo.py
1、设置运行在异构设备上的组网
深度学习组网,通常可以拆解为两部分:1、IO密集型组网;2、计算密集型组网,如下面的DNN组网所示:
# --------- IO 密集型网络 ---------
# 数据输入 & embedding 查表 & sequence_pool 等操作
input_data = paddle.data(name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.data(name="label", shape=[None, 1], dtype="int64")
embedding = paddle.static.nn.embedding(input_data, is_sparse=True, size=[1000,128])
# --------- 计算 密集型网络 ---------
# fc & cnn & rnn & attention 等网络结构
fc1 = paddle.static.nn.fc(embedding, size=1024, act="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, act="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, act="relu")
predict = paddle.static.nn.fc(fc3, size=2, act="softmax")
cost = paddle.nn.functional.cross_entropy(input=predict, label=input_label)
我们可以使用fluid.device_guard()
API划分模型中各个OP的运行设备,上述组网可以改变如下:
with fluid.device_guard("cpu"):
input_data = paddle.data(name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.data(name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(input_data, is_sparse=True, size=[1000,128])
with fluid.device_guard("gpu"):
fc1 = paddle.static.nn.fc(embedding, size=1024, act="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, act="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, act="relu")
predict = paddle.static.nn.fc(fc3, size=2, act="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
这样划分组网的作用是:
IO密集型的OP适合在CPU设备上运行,使数据输入输出不再成为模型训练的瓶颈。
计算密集型OP放在GPU等AI芯片设备上,可以充分利用算力,加速模型训练。
与此同时,Paddle-异构参数服务器,支持并且建议您在训练时,CPU-Trainer的设备数量 >> Heter-Trainer的设备数量,可以充分增大数据的IO效率,同时充分利用异构设备的算力。
2、异构参数服务器Strategy配置
使用fleet
api启动异构参数服务器,需要配置DistributedStrategy
,使用上述组网生成的cost,参数服务器模式下,我们使用如下代码添加Optimizer
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fluid.optimizer.Adam(args.learning_rate)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
而在异构参数服务器模式下,仅需额外指定异构设备使用的device类型,其余保持不变,代码如下:
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
# ---- 新增strategy配置, 指定异构设备的device类型 ----
strategy.a_sync_configs = {"heter_worker_device_guard": 'gpu'}
optimizer = paddle.optimizer.Adam(args.learning_rate)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
3、异构参数服务器的启动环境变量配置
启动异构参数服务,需要在参数服务器的基础上,为异构设备指定:
设备IP及通信端口:
PADDLE_HETER_TRAINER_IP_PORT_LIST=ip:port,ip:port,...
训练角色环境变量:
TRAINING_ROLE=HETER_TRAINER
例如:
export PADDLE_HETER_TRAINER_IP_PORT_LIST='ip:port,ip:port'
export TRAINING_ROLE=HETER_TRAINER
当执行fleet初始化代码时:
fleet.init()
# 若进程检测到环境变量中配置了 PADDLE_HETER_TRAINER_IP_PORT_LIST,则会进入异构参数服务器模式,进行相应的计算图切分及初始化。
# 若进程检测到环境变量中 TRAINING_ROLE 存在,并且等于 HETER_TRAINER 时,则该进程扮演异构计算设备的角色
# 异构设备的设备类型由上文中提到的 strategy.a_sync_configs = {"heter_worker_device_guard": 'gpu'} 指定。
我们提供了一键启动的fleetrun
功能,可以便利的启动异构参数服务器训练,将在下文介绍。
使用fleetrun启动异构参数服务器训练¶
fleetrun
是
paddle2.0rc
版本以后新加入的分布式训练启动工具,可以参考fleetrun,下面介绍一下如何使用fleetrun
启动异构参数服务器。
当训练代码ready以后,假如训练启动入口是train.py
,则可按照以下的方式启动异构参数服务器训练:
方法一,针对单机模拟分布式训练,使用自动分配的ip和port
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 train.py
方法二,针对单机,或自定义的多机训练,使用指定的ip及端口
fleetrun --servers=ip:port,ip:port --workers=ip:port,ip:port --heter_workers=ip:port,ip:port train.py
方法三,针对PaddleCloud平台的custom-framework模式,指定任务的启动命令
PaddleCloud是百度内部的深度学习任务平台,提供了便捷的提交流程以及任务管理功能,该平台完整功能将适时向广大开发者开放,更多信息,可以查阅PaddleCloud
# heter_worker数量会根据配置的GPU设备数量自动调整
# 添加该配置是为了指定fleetrun运行在异构参数服务器模式下
fleetrun --heter_worker_num=2 train.py
异构参数服务器使用示例¶
示例代码位于FleetX/example/heter_parameter_server/
数据下载
bash sh download_data.sh
执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于./train_data_full/
,全量测试数据放置于./test_data_full/
,用于快速验证的训练数据与测试数据放置于./train_data/
与./test_data/
。
至此,我们已完成数据准备的全部工作。
启动训练
# ps-cpu
fleetrun --server_num=2 --worker_num=2 heter_train.py
# ps-heter
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 heter_train.py
1. 飞桨底层分布式API的使用案例¶
1.1 简介¶
卷积神经网络主要包含两种类型的模型层:卷积层和全连接层。卷积层包含约5%的模型参数量和约90-95%的模型计算量;全连接层包含约95%的模型参数量和约5-10%的模型计算量。通常来讲,卷积层适合采用数据并行,因为卷积层模型参数的通信开销更小;而全连接层适合采用模型并行,因为相比于全连接层的模型参数,全连接层的输出张量的通信开销更小。因此,本示例中,AlexNet模型的卷积层采用数据并行,全连接层采用模型并行。
本文档以AlexNet网络为例介绍如何使用飞桨的底层集合通信API实现模型并行训练。
1.2 模型并行原理和实现¶
1.2.1 版本要求¶
paddlepaddle 2.0-rc-gpu版本及以上
1.2.2 分布式API使用案例¶
本节,我们介绍如何实现全连接层的模型并行。
首先,汇聚各块GPU卡全连接层的输入数据,得到全局输入数据;并用全局数据和全连接层计算,得到各块GPU卡的全连接层输出,如下图所示。

接着,汇聚各块GPU卡全连接层的输出数据,并抽取本块GPU的样本数据的全连接层输出,如下图所示。

1.2.3 动态图实现¶
上述过程描述地完整前向计算过程实现代码如下:
# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
# 定义模型并行的全连接网络,需要继承自nn.Layer
class ModelParallelLinear(nn.Layer):
def __init__(self,
in_dim,
rank_num,
rank_id,
class_num):
super(ModelParallelLinear, self).__init__()
if class_num % rank_num:
raise ValueError("Number of classes must be divisible "
"the number of ranks.")
shard_dims = class_num // rank_num
self.linear = nn.Linear(in_dim, shard_dims)
self.rank_num = rank_num
self.rank_id = rank_id
for parameter in self.linear.parameters():
parameter.is_distributed = True
def forward(self, x):
global_x_list = []
paddle.distributed.all_gather(global_x_list, x)
global_x = paddle.concat(global_x_list, axis=0)
out = self.linear(global_x)
global_out_list = []
paddle.distributed.all_gather(global_out_list, out)
all_outs = paddle.concat(global_out_list, axis=1)
out = paddle.split(all_outs, self.rank_num)[self.rank_id]
return out
备注:因为每块GPU卡保存部分全连接层参数,上面的例子中设置模型参数的is_distributed
属性为True,用于避免在反向阶段对相应的模型参数做基于all_reduce的同步操作。
完整地训练代码实现如下:
# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.fluid.dygraph import Conv2D
#分布式step 1: 导入paddle.distributed.fleet包
from paddle.distributed import fleet
from model_parallel_linear import ModelParallelLinear
# 定义全连接网络,需继承自nn.Layer
class SimpleModelParallelClassifierNet(nn.Layer):
def __init__(self,
class_num,
rank_num,
rank_id):
super(SimpleModelParallelClassifierNet, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2)
self.max_pool1 = nn.MaxPool2d(kernel_size=3, stride=2)
self.conv2 = nn.Conv2d(64, 192, kernel_size=5, padding=2)
self.max_pool2 = nn.MaxPool2d(kernel_size=3, stride=2)
self.conv3 = nn.Conv2d(192, 384, kernel_size=3)
self.conv4 = nn.Conv2d(384, 256, kernel_size=3)
self.conv5 = nn.Conv2d(256, 256, kernel_size=3)
self.max_pool5 = nn.MaxPool2d(kernel_size=3, stride=2)
self.model_parallel_linear1 = ModelParallelLinear(2304,
rank_num,
rank_id,
4096)
self.model_parallel_linear2 = ModelParallelLinear(4096,
rank_num,
rank_id,
4096)
self.model_parallel_linear3 = ModelParallelLinear(4096,
rank_num,
rank_id,
class_num)
self.droupout = nn.Dropout(0.5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.max_pool1(x)
x = self.conv2(x)
x = self.relu(x)
x = self.max_pool2(x)
x = self.conv3(x)
x = self.relu(x)
x = self.conv4(x)
x = self.relu(x)
x = self.conv5(x)
x = self.relu(x)
x = self.max_pool5(x)
x = F.dropout(x, 0.5)
x = paddle.reshape(x, [x.shape[0], -1])
x = self.model_parallel_linear1(x)
x = F.dropout(x, 0.5)
x = self.model_parallel_linear2(x)
out = self.model_parallel_linear3(x)
return out
# 分布式step 2: 初始化fleet
fleet.init(is_collective=True)
# 1. 定义网络对象,损失函数和优化器
layer = SimpleModelParallelClassifierNet(class_num=1000,
rank_num=fleet.worker_num(),
rank_id=fleet.worker_index())
adam = paddle.optimizer.Adam(learning_rate=0.001,
parameters=layer.parameters())
# 分布式step 3: 通过fleet获取分布式优化器和分布式模型
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
for step in range(20):
# 2. 执行前向网络
image = paddle.randn([1, 3, 224, 224], 'float32')
label = paddle.randint(low=0, high=10, shape=[1,1])
output = dp_layer(image)
loss = F.softmax_with_cross_entropy(output, label)
loss = paddle.mean(loss)
print("step:{}\tloss:{}".format(step, loss.numpy()))
# 3. 执行反向计算和参数更新
# 分布式step 4: 在执行反向(backward函数)前后进行损失缩放和反向梯度的聚合
loss.backward()
adam.step()
adam.clear_grad()
将上述代码保存为train.py,假设要运行2卡任务,那么只需要在命令行执行下面的命令:
fleetrun --gpus=0,1 tain.py
使用流水线并行进行训练¶
简介¶
随着多种神经网络加速设备和专用神经网络计算芯片的出现,采用异构设备训练模型逐渐成为一种趋势。以CPU和GPU异构训练为例,CPU设备的并行计算能力较弱,但其具备数百GB到数TB的内存容量;与之不同,GPU设备具有强大的并行计算能力,但其显存容量仅为数十GB。同时,网络模型的不同层对计算能力和存储容量的需求差异显著。例如,神经网络的embedding层可以理解为查表操作,对计算能力的要求较低,但对存储容量的需求较大;与之相反,卷积类操作通常对存储容量的需求较低,但对计算能力的需求较高。因此,若能够根据异构设备和模型层间的不同特性,对模型的不同层使用不同的计算设备,可以优化模型训练过程。
原理¶
流水线并行分布式技术与数据并行不同,通过将模型切分到多个计算节点,并采用流水线执行的方式,实现模型的并行训练。以下图为例,模型被切分为三个部分,并分别放置到不同的计算设备(第1层放置到设备0,第2、3层被放置到设备1,第四层被放置到设备2);设备间通过通信的方式来交换数据。

具体地讲,前向计算过程中,输入数据首先在设备0中通过第1层的计算得到中间结果,并将其传输到设备1,然后由设备1计算第2层和第3层,经过最后一层的计算后得到最终的前向计算结果;反向传播过程中,第四层使用前向计算结果得到相应的梯度数据,并由设备2传输到设备1,一次经过第3层和第二层,将结果传至设备0,经过第1层的计算完成所有的反向计算。最后,各个设备上的网络层会更新参数信息。
如下图,为流水线并行中的时序图。简单的流水线并行方式下,任一时刻只有单个计算设备处于激活状态,其它计算设备则处于空闲状态,因此设备利用率和计算效率较差。

为了优化流水线并行的性能,我们可以将mini-batch切分成若干更小粒度的micro-batch,提升流水线并行的并发度,达到提升设备利用率和计算效率的目的。如下图所示,一个mini-batch被切分为4个micro-batch;前向阶段,每个设备依次计算单个micro-batch的结果;这种减小mini-batch的方式减少了每个设备完成一次计算的时间,进而增加了设备间的并发度。

下面我们将通过例子为您讲解如何使用pipeline策略在两张GPU上训练模型。
使用样例¶
导入依赖¶
# -*- coding: UTF-8 -*-
import os
import argparse
import paddle
import time
import math
import numpy as np
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
paddle.enable_static()
定义模型¶
在使用流水线并行的训练策略时,我们通过device_guard
接口将不同的计算层放置在不同的设备上。
对于CPU设备,在使用device_guard
时只需要指定设备类型,即device_guard("cpu")
;对于GPU设备,除了指定设备类型外,还需要指定设备的id,如device_guard("gpu:0")
。
在下面的例子中,我们将数据层及embedding层放置在CPU中, 并将fc及loss放置在第0号GPU卡上。
# 模型组网
def build_network():
# Step1: 使用device_gurad指定相应层的计算设备
with paddle.fluid.device_guard("cpu"):
data = paddle.data(name='sequence', shape=[1], dtype='int64')
data_loader = paddle.io.DataLoader.from_generator(
feed_list=[data],
capacity=64,
use_double_buffer=True,
iterable=False)
emb = nn.embedding(input=data, size=[128, 64])
with paddle.fluid.device_guard("gpu:0"):
fc = nn.fc(emb, size=10)
loss = paddle.mean(fc)
return data_loader, loss
定义数据集及梯度更新策略¶
定义完模型后,我们可以继续定义训练所需要的数据,以及训练中所用到的更新策略。
通过设定dist_strategy.pipeline
为True,将流水线并行的策略激活。
fleet.init(is_collective=True)
data_loader, loss = build_network()
dist_strategy = paddle.distributed.fleet.DistributedStrategy()
dist_strategy.pipeline = True
optimizer = paddle.fluid.optimizer.SGDOptimizer(learning_rate=0.1)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(loss)
def train_reader():
for _ in range(100):
data = np.random.random(size=[32, 1]).astype("int64")
yield data
开始训练¶
place = paddle.CPUPlace()
exe = paddle.static.Executor(place)
data_loader.set_sample_generator(train_reader, batch_size=2)
exe.run(paddle.static.default_startup_program())
data_loader.start()
exe.train_from_dataset(paddle.static.default_main_program())
低频通信参数服务器训练算法¶
简介¶
众所周知,在同步/异步参数服务器分布式训练中Trainer每训练完一个周期,都会将梯度上传至Server,等待Server分发最新的参数后才开始新一轮的训练。在这种训练方式中,节点间的通信会消耗大量的时间成本,进而影响训练的效率。
为了降低节点见通信对训练速度的影响,Fleet提供了一种更高效的参数更新策略:GeoSGD
原理¶

在GeoSGD更新策略中,Trainer的参数更新也是在全异步的条件下进行的。但与异步参数服务器有以下不同:
与普通的参数服务器不同,在GEO策略中,每个Trainer负责在本地维护自己的参数更新,在训练一定数量的步数后将本轮训练出的参数与上一轮结束后的参数做差。并除以Trainer的个数,将结果上传至Server。Server则负责为每个Trainer计算其参数与全局参数的diff。
GEO更新策略会在训练过程中启动多个进程,负责参数更新及节点通信。在Trainer与Server的整个交互过程中,主进程会保持模型的训练,由子进程负责与server进行交互,在拿到与全局参数的diff后将其更新至主进程。
GEO策略通过模型训练与节点通信同步进行的方式,在保证模型效果的前提下,大大提升了训练的速度。经过在SGD优化器上的测试,GEO策略相比异步参数服务器,训练速度提高了1倍。
接下来我们将通过例子为您讲解GEO在Fleet中是如何应用的。
在开始之前我们首先需要下载训练中所需要的数据:
# 下载并解压数据,训练数据讲保存至名为 raw_data 的文件夹
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
操作实践¶
添加依赖¶
首先我们需要添加训练中所用到的python模块,fleetx
可以用于加载我们为用户封装的接口如:加载模型及数据,模型训练等。paddle.distributed.fleet
中定义了丰富的分布式策略供用户使用。
import paddle
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
定义分布式模式并初始化¶
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模式,定义GEO策略使用的初始化接口与同步/异步参数服务器相同,都是init()
默认的模式。
paddle.enable_static()
configs = X.parse_train_configs()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
加载模型及数据¶
在这个例子中我们使用了与同步/异步参数服务器相同的CTR-DNN模型。用X.applications
接口加载模型,并加载定制化的数据。
model = X.applications.MultiSlotCTR()
loader = model.load_multislot_from_file('./train_data')
定义同步训练 Strategy 及 Optimizer¶
在Fleet
API中,用户可以使用fleet.DistributedStrategy()
接口定义自己想要使用的分布式策略。
想要使用GEO策略,用户首先需要打开异步参数服务器开关,即设置a_sync
为
True。
然后用户需要通过dist_strategy.a_sync_configs
设置Trainer上传参数的频率,下面的代码中我们设置Trainer每训练10000个Batch后与Server进行交互。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {"k_steps": 10000}
optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
GEO策略的训练代码沿用了参数服务器分布式训练的形式。
对于Server节点,首先用init_server()
接口对其进行初始化,然后启动服务并开始监听由训练节点传来的参数变化值。
同样对于训练节点,用init_worker()
接口进行初始化后x,开始执行训练任务。运行X.Trainer.fit
接口开始训练。
if fleet.is_server():
fleet.init_server()
fleet.run_server()
else:
fleet.init_worker()
trainer = X.Trainer(fluid.CPUPlace())
trainer.fit(model, loader, epoch=10)
运行训练脚本¶
定义完训练脚本后,我们就可以用fleetrun
指令运行分布式任务了。其中server_num
,
worker_num
分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。
fleetrun --server_num=1 --worker_num=2 ctr_app.py
服务型弹性蒸馏¶
简介¶
蒸馏训练¶
在很多场景下,模型越大,层数越多,模型效果就越好。但受限于推理速度,显存资源等要求,大模型通常无法直接部署,需要对模型进行压缩。知识蒸馏是一种经典的模型压缩技术,由《Distilling the Knowledge in a Neural Network》 在2015年第一次提出,是将知识从一个复杂模型(Teacher)迁移到另一个轻量级模型(Student)上的方式来实现模型压缩。
如下图所示,训练步骤可以分为两步:
训练好一个Teacher模型。
使用Teacher模型的知识来训练Student模型。 所谓Teacher模型的知识是指Teacher模型的推理结果,我们称之为soft label,这个soft label将作为Student网络的训练目标,Student的推理结果需要尽可能接近Teacher的推理结果。

服务型蒸馏训练¶
服务型蒸和其他常见蒸馏方式的对比:
离线蒸馏训练: 先使用Teacher模型做推理并将结果保存在磁盘中,然后Student模型使用磁盘中保存的样本和Teacher模型的推理结果作为数据集进行训练。这种训练方式一般需要数据增强,而且需要占用巨大的磁盘空间,因此应用环境受到了一定的限制。
常规蒸馏训练: 常规蒸馏训练是指将 Teacher 模型和 Student 模型放入同一网络中,固定 Teacher 模型参数只做前向,Student 模型则正常做反向传播训练。这也是目前主流的蒸馏训练方式, 单这种方式下 Student 模型的训练完全依赖 Teacher 模型,Student 模型要等 Teacher 模型输出一个 batch 的推理结果才可以训练,而 teacher 模型也要等 Student 训练完一个 batch,才能开始下一个 batch 的推理,对整体的训练速度有一定的影响。
服务型蒸馏训练: 是基于 Elastic Deep Learning 提出的一种训练方案。它将Teacher模型和Student模型解耦,Teacher模型被部署为线上推理服务,Student模型则以客户端的身份通过互联网实时发送样本到Teacher模型获取推理结果进行训练
服务型蒸馏训练收益¶
节约显存资源: 由于Student模型和Teacher模型的解耦,所以服务型蒸馏训练可以使用异构的资源,也就是把Student模型和Teacher模型的部署到不同的设备上。原先受限于显存大小而难以部署到单个GPU卡上的蒸馏网络可以通过该方式部署到不同卡上。
提升训练速度:由于节约了显存资源,这样就可以使Student模型能够训练更大的batch size;同时由于Student模型和Teacher模型是异构流水线,Student模型不用等Teacher模型推理结束后再训练。
提高训练资源利用率:我们可以将Teacher模型部署到线上的弹性预估卡集群,利用线上预估卡闲时的算力资源提升蒸馏任务中Teacher模型侧的吞吐量。同时由于Teacher模型可以弹性调度,不用担心高峰时线上实例被抢占造成的任务失败。还可以将Teacher模型部署到集群碎片资源,或者如k40等使用率较低的资源上,充分利用集群的空闲、碎片资源。
提升训练效率:用户可以根据Teacher和Student的吞吐性能灵活设置Teacher和Student的比例,也就是说多个老师可以教多个学生,而不是只能保持1比1的家教模式,最大限度地提高训练的产出。
EDL 服务型弹性蒸馏效果¶
ResNet50_vd模型, ImageNet 数据集
服务型弹性蒸馏¶
DistillReader¶
服务型弹性蒸馏的核心是将Teacher模型部署成了服务端,而Student模型成了客户端。
将Teacher模型被部署为在线可容错弹性服务, 在Student模型一侧则通过
DistillReader
来封装Student模型与Teacher模型之间的通信,访问Teacher服务。

DistillReader 产生可供Student模型训练的数据reader。如上图所示,Student模型将训练样本和标签传入训练reader,DistillReader从训练reader中读取训练样本发送给Teacher模型,然后获取推理结果。 DistillReader 的结构如下图。
推理结果和原训练reader中的数据封装在一起,返回一个包含推理结果的新reader给Student模型,这样Teacher 模型的推理和Student 模型的训练就可以流水行并行起来了。

可容错弹性服务¶
可容错弹性服务的实现架构如下图所示,首先我们通过Paddle Serving将多个Teacher模型部署成服务,并注册服务到Redis数据库中;Student模型则作为客户端从服务发现中查询所需的Teacher服务;服务发现从Redis数据库查询并按某种负载均衡策略返回客户端所需的Teacher列表;每当Teacher变化时,客户端就可以实时拿到最新Teacher列表,连接Teacher进行蒸馏训练,不用担心发生由于连接到被收回的Teacher资源而导致任务失败的请况。
Student 模型给Teacher 模型发送样本并获取推理结果,而Teacher 模型服务端则可以随意增删,弹性调整。

快速开始¶
下文通过训练图像分类模型来简单介绍服务型蒸馏训练的使用。
为简单起见,使用的是单机环境,服务端和客户端部署在了同一个服务器上,服务端的IP地址是127.0.0.1。如果部署在不同设备上,修改下代码中的IP地址即可。
环境准备¶
下命令拉取镜像,镜像为CUDA9.0的环境,在里面我们预装了EDL、飞桨核心框架和Padde Serving等相关依赖。
docker pull hub.baidubce.com/paddle-edl/paddle_edl:latest-cuda9.0-cudnn7
nvidia-docker run -name paddle_edl hub.baidubce.com/paddle-edl/paddle_edl:latest-cuda9.0-cudnn7 /bin/bash
启动Teacher模型¶
如下命令在1号GPU卡启动Teacher服务,其中Teacher模型为图像分类模型ResNeXt101_32x16d_wsl,服务的端口号为9898,并启动了内存优化功能。
cd example/distill/resnet
wget --no-check-certificate https://paddle-edl.bj.bcebos.com/distill_teacher_model/ResNeXt101_32x16d_wsl_model.tar.gz
tar -zxf ResNeXt101_32x16d_wsl_model.tar.gz
python -m paddle_serving_server_gpu.serve \
--model ResNeXt101_32x16d_wsl_model \
--mem_optim True \
--port 9898 \
--gpu_ids 1
启动Student模型训练¶
如下命令在0号GPU卡启动Student模型,启动的student模型为ResNet50_vd。 其中train_with_fleet.py是用于启动训练的脚本,用户需要在其中添加蒸馏训练相关的代码,如果用户想了解脚本的修改方法或可以参考如github。
python -m paddle.distributed.launch --gpus 0 \
./train_with_fleet.py \
--model=ResNet50_vd \
--data_dir=./ImageNet \
--use_distill_service=True \
--distill_teachers=127.0.0.1:9898
推荐阅读:¶
优化低配网络的分布式GPU训练¶
在网络带宽较低的训练场景(如:
公有云上训练,联邦训练)中,梯度同步在低带宽网络下的延迟成为训练速度的主要瓶颈。
Fleet 作为Paddle通用的分布式训练API 实现了: Deep Gradient Compression
和 Local SGD
两种训练策略来针对性解决这一问题。
DGC 优化低配网络的分布式GPU训练¶
DGC 简介¶
大规模分布式训练需要较高的网络带宽以便进行梯度的聚合更新,这限制了多节点训练的扩展性,同时也需要昂贵的高带宽设备。在低带宽的网络环境下进行分布式训练时,梯度同步成为训练加速的瓶颈。
Deep Gradient Compression
发现:分布式SGD中有99.9%的梯度交换都是冗余的,可以使用深度梯度压缩选择重要梯度进行通信来减少通信量,降低对通信带宽的依赖。Fleet
实现了DGC的稀疏通信方式,可有效在低配网络下进行GPU分布式训练。Fleet
实现了 DGC 论文中的 预热训练 (warming up training)
,
动量修正 (Momentum Correction)
,
局部梯度修剪 (local gradient clipping)
,
动量因子掩藏 (Momentum factor masking)
等策略, 和
正则化项修正 (Weight Decay Correction)
避免稀疏梯度通信训练带来的最终模型精度损失。
下面将介绍 DGC 稀疏通信方式的适用场景、试验效果、基本原理和使用方法。
适用场景¶
DGC稀疏通信在低带宽通信瓶颈时会有较大的性能提升,但在单机多卡及RDMA网络通信并非瓶颈情况下,并不会带来性能上的提升。同时由于AllGather的通信量会随卡数的增多而增大,所以DGC的多机训练规模也不宜过大。故DGC适用于低配网络,同时节点规模不宜过大,如>128张卡。在云网络或高带宽网络设备昂贵时,DGC可有效降低训练成本。
试验效果¶
模型:FasterRCNN
硬件: P40两机分布式,每台机器一卡,TCP网络测试。
取300-700步耗时/400step。
精度无损。
DGC 原理简介¶
这里将简单介绍介绍Fleet DGC 中的一些原理和对应参数应该如何设置。
梯度稀疏¶
DGC的基本思路是通过只传送重要梯度,即只发送大于给定阈值的梯度来减少通信带宽的使用。为避免信息的丢失,DGC会将剩余梯度在局部累加起来,最终这些梯度会累加大到足以传输。 换个角度,从理论依据上来看,局部梯度累加等同于随时间推移增加batch size,(DGC相当于每一个梯度有自己的batch size)。
假设 N是训练节点个数, b为单卡batch size,局部梯度累加可以被认为batch size从\(Nb\)增大为\(NbT\),其中T是两次更新的稀疏通信间隔。详细的公式推导请参阅 [1] 预热调参 ^^^^^^^^
对于正常的训练,使用DGC一般需进行预热训练,否则可能会有精度损失。由于paddle稀疏梯度聚合通信使用了AllGather,通信量会随卡数增加而增长,所以在卡数较多时不推荐较低稀疏度的预热训练。参数设置如下:
# 1. 以1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%
strategy.dgc_configs = {
"rampup_begin_step": 1252*2,
"rampup_step": 1252*3,
"sparsity": [0.984375, 0.996, 0.999]
}
# 2. 前面4个epochs都使用dense通信,之后默认0.999稀疏度运行
strategy.dgc_configs = {
"rampup_begin_step": 1252*4,
"rampup_step": 1,
"sparsity": [0.999]
}
对于Fine-tuning训练,可无需预热训练,从第0个epoch直接使用DGC即可。
# 从第0步开始DGC稀疏通信
strategy.dgc_configs = {
"rampup_begin_step": 0,
"rampup_step": 1,
"sparsity": [0.999]
}
局部梯度累加改进¶
正常情况,稀疏更新会严重影响收敛性。DGC中采用动量修正(Momentum Correction)和局部梯度裁减(Local Gradient Clipping), 动量因子掩藏, 正则化项修正 4个策略来解决这个问题。
动量修正¶
上文”局部梯度累加等同于随时间推移增加batch size“的推导没有考虑 Momentum存在的情况。当稀疏度很高时,使用原始Momentum 公式会显著降低模型性能,所以需要在原始公式的基础上对梯度进行修正。
动量修正使用部累加速度项\(u_t\)而非累加真实的梯度\(\nabla_{k, t}\)来修改Momentum 方程,修正后的动量更新公式如下:
局部梯度修剪¶
梯度修剪是防止梯度爆炸的常用方法。这方法由Pascanu等人在2013年提出,当梯度的l2-norms和大于给定阈值时,就对梯度rescale。正常梯度修剪在梯度聚合后使用,而DGC因为每个节点独立的进行局部梯度累加,所以DGC在使用\(G_t\)累加前对其进行局部梯度修剪。阈值缩放为原来的\(N^{-1/2}\) 。
动量因子掩藏¶
因为推迟了较小梯度更新权重的时间,所以会有权重陈旧性问题。稀疏度为99.9%时大部分参数需600到1000步更新一次。迟滞效应会减缓收敛并降低模型精度。DGC中使用下面方程来掩藏动量因子减缓陈旧性问题。
此掩码可以停止延迟梯度产生的动量,防止陈旧梯度把权重引入错误的方向。
正则化(Weight Decay)项修正¶
类似动量修正,DGC 中我们同样需要对正则化项进行修正来让参数的延迟更新方向更加准确。
和动量修思路相同,修正需要在局部梯度上添加局部Weight Decay。
上述策略已经在Fleet 框架中实现,用户无须设置。
DGC 快速开始¶
下文以单机八卡上训练ResNet50 为例子简单介绍 Fleet 中 DGC 的使用。 因为 8张 GPU 的通信都在同一节点内, 一般情况下梯度通信并不会成为训练的瓶颈, 这里只是以其为例子,介绍Fleet 中 DGC 参数的设置。
注意:
硬件环境要求: DGC目前只支持GPU多卡及分布式collective训练,需要有相应的cuda、cuDNN、nccl环境。
Paddle环境要求: DGC只支持GPU,所以需GPU版本的Paddle。
添加依赖¶
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化¶
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模型,下面代码中的is_collective=True
表示采用集合通信的GPU分布式模式训练模型。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据¶
用户可以通过X.applications
接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
DGC 相关策略¶
这里假设:1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%
rampup_begin_step (int)
:DGC(含预热训练)开始的 steprampup_step (int)
:DGC中预热训练持续的 step. 如果sparsity 是 [0.75, 0.9375, 0.984375, 0.996, 0.999],rampup_step 设成 100时, 在 0~19 steps 时 sparsity=0.75,在 20~39 steps 时 sparsity=0.9375, 以此类推。sparsity (list[float])
:稀疏度 threshold, (1 - current sparsity) % 的gradient 将会被 allreduce。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.dgc = True
dist_strategy.dgc_configs = {
"rampup_begin_step": 1252*2,
"rampup_step": 1252*3,
"sparsity": [0.984375, 0.996, 0.999]
}
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
这一部分和Fleet 中其他任务基本相同:
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本¶
一行启动单机多卡分布式训练:
fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log ./example_dgc.py
# reader shuffle seed 0
# trainerid, trainer_count 0 8
# read images from 0, length: 160146, lines length: 160146, total: 1281168
# worker_index: 0, step0 cost = 7.151402, speed: 37.698432
# worker_index: 0, step1 cost = 7.112389, speed: 101.518513
# worker_index: 0, step2 cost = 7.004275, speed: 111.062341
# worker_index: 0, step3 cost = 7.039385, speed: 62.173126
# worker_index: 0, step4 cost = 6.985911, speed: 104.058060
# ......
使用Local SGD 优化低带宽下分布式训练¶
Local SGD 简介¶
在使用 distributed SGD 进行数据并行的分布式训练时,常会遇到以下两个问题:
分布式训练的吞吐会受到集群中随机慢节点(straggling node)和通信延迟的影响。
数据并行分布式增大了训练实际的batch size,过大的batch size 会影响最终的训练精度。
Local SGD
通过延长节点间同步的间隔(局部异步训练)来减轻慢节点的影响和减少通信频率,以此提升训练的吞吐速度;另一方面,为了减小相对于本地训练(小batch
size)的精度损失,[1] 和 [2]
分别提出了:post-Local SGD
和
自适应步长 (Adaptive Communication) Local SGD
策略,来减少参数同步频率降低带来的精度损失。 同步SGD 和 Local
SGD 在通信同步上的差异如下图所示。

在Local SGD 训练中,集群中的每个 trainer 各自会独立的进行 H 个连续的 SGD 更新, 然后集群中的所有 trainer 会进行通信,同步(averaging)所有 trainers 上的参数。一个双 trainers,同步间隙为3 步长(iterations) 的Local SGD过程如下图所示。黄绿两条路径表示两个 trainers 各自的 Local SGD 更新过程,中间的蓝色路径表示同步后的模型所在的位置。

Local SGD中的一个关键问题是如何确定参数同步的间隔(频率),以到达训练吞吐和训练精度间更好的平衡:
增大参数同步的间隔可以减少 trainers 间通信延迟的影响提高训练吞吐.
但增大同步间隔可能会造成最终训练精度的损失。 [1]
以下两个策略从不同角度试图达到更好的平衡:
post Local SGD 将训练过程分成两个阶段:第一阶段 wokers 间同步的间隔为 1 个步长,即同步SGD,来保证最终训练精度;在第二阶段增大同步间隔到固定常数 H,来提升训练吞吐。
Adaptive Communication Local SGD 通过动态的调整参数同步的间隔来尝试达到训练吞吐和精度间的更好的平衡。在训练初始或者上一段参数同步完成后,根据如下公式计算一下次参数同步的间隔(iteration)。详细的公式推导和参数定义请参考原论文。
Fleet 中实现了 post Local SGD
和
Adaptive Communication Local SGD
两种策略。 中下文将给出 Fleet中
Local SGD 的实践效果,并通过一个简单例子介绍如何在Fleet 中使用 Local
SGD。
试验效果¶
试验设置
model |
dataset |
local batch size |
cluster |
dtype |
warming up |
learning rate decay |
---|---|---|---|---|---|---|
resnet50 |
Imagenet |
128 |
4 x 8 x V100 |
FP32 |
30 |
polynomial |
试验结果
local step |
qps |
acc1 |
acc5 |
---|---|---|---|
1 |
8270.91 |
0.7579 |
0.9266 |
2 |
8715.67 |
0.7533 |
0.9265 |
4 |
8762.66 |
0.7551 |
0.9260 |
8 |
9184.62 |
0.7511 |
0.9239 |
16 |
9431.46 |
0.7429 |
0.9206 |
ADACOMM |
8945.74 |
0.7555 |
0.9270 |
可以看到在 post Local SGD (固定同步间隔)情况下,更新间隔越长训练的吞吐越高,但是模型的最终进度也会损失越大。 当使用 ADAPTIVE COMMUNICATION 策略后,训练在吞吐和精度间达到了一个更好的平衡。
Local SGD 快速开始¶
下文将以在单机8卡中训练 ResNet50 为例子简单介绍 Fleet 中 Local SGD 的用法。 需要注意的是 单机八卡的通信都在同一节点内, 一般情况下参数同步并不会成为训练的瓶颈, 这里只是以其为例子,介绍Fleet 中 Local SGD 参数的设置。
添加依赖¶
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化¶
通过X.parse_train_configs()
接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()
接口定义了分布式模型,下面代码中的is_collective=True
表示采用集合通信的GPU分布式模式训练模型。
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据¶
用户可以通过X.applications
接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
定义Local SGD 相关策略¶
用户首先需要定义paddle SGD 对象,并在SGD 对象中设置学习率参数。目前local SGD和自适应步长 local SGD都仅支持SGD和Momentum两种优化器。
在post Local SGD 中,有两个用户设置参数
begin_step
和k_steps
,局部更新和参数同步都由框架自动完成。begin_step 指定从第几个step之后进行local SGD算法,取值为大于0的整数;k_step 指定训练过程中的全局参数更新间隔,取值为大于0的整数。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.localsgd = True
dist_strategy.localsgd_configs = {
"k_steps": 1,
"begin_step": 1,
}
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
在 自适应步长 local SGD 中,有两个用户设置参数
begin_step
和init_k_steps
。begin_step 指定从第几个step之后进行自适应local SGD算法,取值为大于0的整数;用户需要设置init_k_steps作为第一次参数同步的间隔,之后的同步间隔将由上文中的公式动态确定,在学习率较大时,参数变化大,减小step,多进行通信从而保证快速收敛;在学习率较小时,参数变化小,增大step,减少通信次数,从而提升训练速度。 需要注意的是自适应步长策略中,系统会默认限制最大的同步间隔为 16 step,当公式计算出的间隔大于16 时,按16 steps 进行参数同步。
dist_strategy = fleet.DistributedStrategy()
dist_strategy.adaptive_localsgd = True
dist_strategy.adaptive_localsgd_configs = {
"init_k_steps": 1,
"begin_step": 1,
}
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练¶
这一部分和Fleet 中其他任务基本相同:
place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i, data in enumerate(loader()):
start_time = time.time()
cost_val = exe.run(model.main_prog,
feed=data,
fetch_list=[model.loss.name])
end_time = time.time()
print(
"worker_index: %d, step%d cost = %f, speed: %f"
% (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本¶
一行启动单机多卡分布式训练:
fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log resnet50_localsgd.py
# reader shuffle seed 0
# trainerid, trainer_count 0 8
# read images from 0, length: 160146, lines length: 160146, total: 1281168
# worker_index: 0, step0 cost = 7.151402, speed: 37.698432
# worker_index: 0, step1 cost = 7.112389, speed: 101.518513
# worker_index: 0, step2 cost = 7.004275, speed: 111.062341
# worker_index: 0, step3 cost = 7.039385, speed: 62.173126
# worker_index: 0, step4 cost = 6.985911, speed: 104.058060
# ......
飞桨分布式训练基线报告¶
Resnet50性能基准¶
Resnet50是当前视觉领域比较通用的预训练模型后端,同时也作为评价深度学习框架训练性能的最重要模型之一,我们提供Resnet50在ImageNet数据集上的性能基准供用户参考。
软硬件配置情况¶
基本版本信息¶
软硬件指标 |
具体配置 |
---|---|
实例类型 |
百度X-Man 2.0 |
单实例GPU |
8x NVIDIA® Tesla® V100 |
操作系统 |
Ubuntu 16.04 LTS with tests run via Docker |
CPU |
Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz |
内存 |
512G |
CUDA / CUDNN版本 |
10.1 / 7.6.5 |
NCCL / DALI 版本 |
2.4.7 / 0.24.0 |
多GPU实例互联信息 |
InfiniBand 100 Gb/sec |
Paddle Github Commit |
|
FleetX Github Commit |
|
硬盘类型 |
本地SSD硬盘 |
数据集 |
ImageNet |
评估模型 |
Resnet50 |
复现代码地址 |
|
Python版本 |
3.7 |
硬件拓扑¶
nvidia-smi topo -m
GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 mlx5_0 CPU Affinity
GPU0 X NV2 NV2 NV1 NV1 NODE NODE NODE NODE 0-23
GPU1 NV2 X NV1 NV1 NODE NV2 NODE NODE NODE 0-23
GPU2 NV2 NV1 X NV2 NODE NODE NV1 NODE NODE 0-23
GPU3 NV1 NV1 NV2 X NODE NODE NODE NV2 NODE 0-23
GPU4 NV1 NODE NODE NODE X NV2 NV2 NV1 NODE 0-23
GPU5 NODE NV2 NODE NODE NV2 X NV1 NV1 NODE 0-23
GPU6 NODE NODE NV1 NODE NV2 NV1 X NV2 NODE 0-23
GPU7 NODE NODE NODE NV2 NV1 NV1 NV2 X NODE 0-23
mlx5_0 NODE NODE NODE NODE NODE NODE NODE NODE X
Legend:
X = Self
SYS = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
PHB = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
PXB = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge)
PIX = Connection traversing a single PCIe switch
NV# = Connection traversing a bonded set of # NVLinks
性能测试方法¶
硬件资源 采用多机多卡训练,以实例数 x 单实例GPU卡数作为评价标准,评价
1 x 1
,1 x 8
,2 x 8
,4 x 8
情况下的性能基准。训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整,即不考虑收敛的对比。
测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是忽略前20个
step
,然后取后面100个step的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。
基准测试结果¶
单位:
Images/s
,使用精度FP32,DistributedStrategy
如下:
import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.nccl_comm_num = 1
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
335.43 |
2488.49 |
4629.71 |
9093.41 |
64 |
353.38 |
2643.75 |
5325.44 |
10536.83 |
128 |
368.11 |
2797.31 |
5635.98 |
11261.72 |
单位:
Images/s
,使用自动混合精度Automatic Mixed Precision(AMP)进行训练,DistributedStrategy
如下:
import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.amp = True
dist_strategy.nccl_comm_num = 1
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
740.01 |
4467.82 |
8628.19 |
16970.01 |
64 |
919.95 |
6148.98 |
12071.29 |
23682.78 |
128 |
1018.3 |
7324.31 |
14342.03 |
28397.43 |
256 |
1096.5 |
8166.11 |
16189.79 |
32366.39 |
单位:
Images/s
, 自动并行模式,DistributedStrategy
如下:
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
dist_strategy.auto = True
为了获得更好的性能,我们默认打开了DALI进行数据IO,这里注意单机单开的自动并行开启的选项可能和多卡不同,因此加速比不具备参考意义。
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
666.38 |
4467.82 |
8711.69 |
19107.42 |
64 |
761 |
6148.98 |
12076.77 |
24314.58 |
128 |
890.03 |
6793.73 |
13514.66 |
27277.36 |
256 |
938.57 |
7305.66 |
14599.55 |
29361.24 |
Bert模型训练性能基线¶
Transformer模型性能基线¶
VGG16模型训练性能基线¶
VGG16是当前视觉领域比较通用的预训练模型后端,同时也作为评价深度学习框架训练性能的最重要模型之一,我们提供VGG16在ImageNet数据集上的性能基准供用户参考。
软硬件配置情况¶
基本版本信息¶
软硬件指标 |
具体配置 |
---|---|
实例类型 |
百度X-Man 2.0 |
单实例GPU |
8x NVIDIA® Tesla® V100 |
操作系统 |
Ubuntu 16.04 LTS with tests run via Docker |
CPU |
Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz |
内存 |
512G |
CUDA / CUDNN版本 |
10.1 / 7.6.5 |
NCCL / DALI 版本 |
2.4.7 / 0.24.0 |
多GPU实例互联信息 |
InfiniBand 100 Gb/sec |
Paddle Github Commit |
b6711e24ea5e17fa24e9b1ba516fe03186dd4a2c |
FleetX Github Commit |
296e0d71a33a8d250f6626733bc2535465f5f6e0 |
硬盘类型 |
本地SSD硬盘 |
数据集 |
ImageNet |
评估模型 |
VGG16 |
复现代码地址 |
|
Python版本 |
3.7 |
硬件拓扑¶
nvidia-smi topo -m
GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 mlx5_0 CPU Affinity
GPU0 X NV2 NV2 NV1 NV1 NODE NODE NODE NODE 0-23
GPU1 NV2 X NV1 NV1 NODE NV2 NODE NODE NODE 0-23
GPU2 NV2 NV1 X NV2 NODE NODE NV1 NODE NODE 0-23
GPU3 NV1 NV1 NV2 X NODE NODE NODE NV2 NODE 0-23
GPU4 NV1 NODE NODE NODE X NV2 NV2 NV1 NODE 0-23
GPU5 NODE NV2 NODE NODE NV2 X NV1 NV1 NODE 0-23
GPU6 NODE NODE NV1 NODE NV2 NV1 X NV2 NODE 0-23
GPU7 NODE NODE NODE NV2 NV1 NV1 NV2 X NODE 0-23
mlx5_0 NODE NODE NODE NODE NODE NODE NODE NODE X
Legend:
X = Self
SYS = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
PHB = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
PXB = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge)
PIX = Connection traversing a single PCIe switch
NV# = Connection traversing a bonded set of # NVLinks
性能测试方法¶
硬件资源 采用多机多卡训练,以实例数 x 单实例GPU卡数作为评价标准,评价
1 x 1
,1 x 8
,2 x 8
,4 x 8
情况下的性能基准。训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整 ,即不考虑收敛的对比。
测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是忽略前20个
step
,然后取后 面100个step的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。
基准测试结果¶
单位:
Images/s
,使用精度FP32,DistributedStrategy
如下:
exec_strategy = fluid.ExecutionStrategy()
dist_strategy = fleet.DistributedStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.nccl_comm_num = 1
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
252.68 |
1888.20 |
2873.18 |
5548.48 |
64 |
261.33 |
1980.04 |
3900.12 |
7617.47 |
128 |
266.24 |
2027.06 |
4028.78 |
7848.70 |
单位:
Images/s
,使用自动混合精度Automatic Mixed Precision(AMP)进行训练,DistributedStrategy
如下:
import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.amp = True
dist_strategy.nccl_comm_num = 1
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
407.69 |
3332.17 |
5136.50 |
9544.81 |
64 |
468.51 |
3708.32 |
7112.45 |
14013.01 |
128 |
512.02 |
3892.58 |
7618.34 |
15219.57 |
256 |
439.47 |
3409.96 |
6779.20 |
13443.23 |
单位:
Images/s
, 自动并行模式,DistributedStrategy
如下:
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
dist_strategy.auto = True
为了获得更好的性能,我们默认打开了DALI进行数据IO,这里注意单机单开的自动并行开启的选项可能和多卡不同,因此加速比不具备参考意义。
batch / node |
1 x 1 |
1 x 8 |
2 x 8 |
4 x 8 |
---|---|---|---|---|
32 |
409.68 |
3044.60 |
4840.74 |
7668.70 |
64 |
455.98 |
3395.67 |
6525.20 |
12237.04 |
128 |
472.81 |
3587.29 |
7019.13 |
13562.80 |
256 |
407.88 |
3154.15 |
6217.92 |
12147.46 |
Word2vec模型性能基准¶
word2vec被广泛运用于NLP及推荐系统领域,同时也作为评价深度学习框架训练性能的重要模型之一,我们提供word2vec在1-billion数据集上的分布式训练的性能基准供用户参考,包含4、8、16、32台服务器联合训练。
软硬件配置情况¶
基本版本信息¶
软硬件指标 |
具体配置 |
---|---|
实例类型 |
纯CPU训练集群 |
操作系统 |
Ubuntu 16.04 LTS with tests run via Docker |
CPU |
Intel(R) Xeon(R) CPU E5-2450 v2 @ 2.50GHz |
内存 |
128G |
Paddle Github Commit |
|
FleetX Github Commit |
|
硬盘类型 |
本地SSD硬盘 |
数据集 |
1-billion |
评估模型 |
Work2vec |
复现代码地址 |
|
Python版本 |
3.7 |
性能测试方法¶
硬件资源 采用多机多进程训练,每一台服务器实例均启动一个pserver,一个trainer,每个trainer配置固定数量的线程,以实例数作为评价标准,评价
4
,8
,16
,32
情况下的性能基准。训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整>,即不考虑收敛的对比。
测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是统计一轮训练中总的字数,用一轮总字数除总耗时得到的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。
基准测试结果¶
单位:
Images/s
,使用精度FP32,DistributedStrategy
如下:
import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync=True
dist_strategy.a_sync_configs = {"k_steps": 100}
batch / node |
4 |
8 |
16 |
32 |
---|---|---|---|---|
100 |
55597.5 |
55082.37 |
53302.63 |
47280.91 |
用户FAQ¶
TBA
FleetX使用Apache License 2.0开源协议