欢迎来到飞桨分布式技术文档主页

  • 欢迎您关注飞桨分布式训练,我们希望能帮助每一个用户走上大规模工业化生产之路!

整体介绍与内容概览

欢迎关注大规模深度学习技术

近十年来,深度学习技术不断刷新视觉、自然语言、语音、搜索、推荐等领域各种任务的记录。这其中的原因,用一个关键词描述就是“大规模”。大规模的数据使得模型有足够的知识可以记忆,大规模参数量的模型使得模型本身有能力记忆更多的数据,大规模高性能的算力(以GPU为典型代表)使得模型的训练速度有百倍甚至千倍的提升。数据、模型、算力的发展催生了大规模深度学习这个领域,如何进行多机任务的拆分、如何配置集群训练资源、如何平衡训练速度和收敛速度、如何训练单机无法训练的模型、弹性训练与容错等都是这个方向重点研究的问题。

飞桨分布式训练提供的核心价值

  1. 源自产业实践的经验:

  • 飞桨的分布式训练技术源自百度的业务实践,是经过超大规模业务数据检验过的训练框架。

  • 飞桨分布式训练经过实践检验的应用领域包括自然语言处理,计算机视觉,搜索,推荐等。

  1. 完备的并行模式:

  • 数据并行:针对产业界最常用的数据并行模式,飞桨针对实际业务需求重点打磨多项技术,包括;飞桨提供集合通信架构和参数服务器架构两种方式,支持工业实践中常见的同步训练和异步训练的机制,并提供收敛效果有保障的分布式优化算法。

  • 流水线并行:面向异构硬件,流水线并行能够将模型计算部分拆分到不同硬件并充分流水线化,从而大规模提升异构硬件的整体利用率。

  • 模型并行:对于超大规模分类问题,飞桨提供计算与存储同时并行的模型并行,解决单GPU无法解决的问题。

  1. 面向云端场景的并行训练组件:

  • 飞桨针对集群网络环境、硬件设备比较低配的场景提供多种实用的并行策略和优化算法。

  • 针对云端算力具有弹性的特点,飞桨也始终在探索弹性深度学习的应用。

开始你的分布式训练之旅

RoadMap

  • 我们也会推送大规模深度学习技术领域最前沿的技术到这里

    • 近期:千亿规模模型参数的GPU多机多卡训练,敬请期待

安装Paddle与FleetX

Paddle

使用飞桨进行分布式训练的最小安装集合就是安装Paddle。从Paddle 2.0版本开始,我们面向不同用户群体提供不同类型的分布式训练API。

  • 面向算法工程师为主的高级API paddle.distributed.fleet

  • 面向具有分布式训练底层工程开发能力的工程师提供的API paddle.distributed

  • 您只需要安装Paddle,就可以获得飞桨团队官方提供的所有分布式训练功能。

pip install paddlepaddle-gpu

关于安装Paddle,这里 有更完备的安装指南供您参考。

FleetX

更大规模的数据、能够记忆并泛化大数据的模型、超大规模算力是利用深度学习技术提升业务效果的有效方法。我们针对需要大规模数据、大容量模型并需要进行高性能分布式训练的应用场景,开发了 FleetX 工具包。

  • 在数据维度,提供用户可以快速定义的标准公开数据集以及低门槛替换自己业务数据集的接口。

  • 在模型维度,FleetX提供典型的分布式训练场景下最常用的标准模型供用户直接使用,例如标准的预训练模型Resnet50、Ernie、Bert等。

  • 在利用大规模算力集群方面,FleetX使用Paddle原生提供的分布式训练能力,面向不同的模型提供最佳的分布式训练实践,在保证收敛效果的前提下最大化用户的集群使用效率。

pip install fleet-x==0.0.7

或者使用我们为用户提供了已经编译好的安装包,可以下载到本机后下载:

# python2
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.7-py2-none-any.whl
pip install fleet_x-0.0.4-py2-none-any.whl
# python3
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.7-py3-none-any.whl
pip3 install fleet_x-0.0.4-py3-none-any.whl

静态图分布式训练快速开始

对于大部分用户来讲,数据并行训练基本可以解决实际业务中的训练要求。我们以一个非常简单的神经网络为例,介绍如何使用飞桨高级分布式API paddle.distributed.fleet进行数据并行训练。在数据并行方式下,通常可以采用两种架构进行并行训练,即集合通信训练(Collective Training)和参数服务器训练(Parameter Server Training),接下来的例子会以同样的模型来说明两种架构的数据并行是如何实现的。

版本要求

  • paddlepaddle-2.0.0-rc-cpu / paddlepaddle-2.0.0-rc-gpu及以上

模型描述

为了方便说明,我们采用两层全连接网络的分类模型,并使用CrossEntropyLoss来评价模型是否优化的符合目标,数据方面我们采用Paddle内置的Mnist数据集,存放在model.py

import paddle
import paddle.static.nn as nn

paddle.enable_static()
def mnist_on_mlp_model():
    train_dataset = paddle.vision.datasets.MNIST(mode='train')
    test_dataset = paddle.vision.datasets.MNIST(mode='test')
    x = paddle.data(name="x", shape=[64, 1, 28, 28], dtype='float32')
    y = paddle.data(name="y", shape=[64, 1], dtype='int64')
    x_flatten = paddle.reshape(x, [64, 784])
    fc_1 = nn.fc(input=x_flatten, size=128, act='tanh')
    fc_2 = nn.fc(input=fc_1, size=128, act='tanh')
    prediction = nn.fc(input=[fc_2], size=10, act='softmax')
    cost = paddle.fluid.layers.cross_entropy(input=prediction, label=y)
    acc_top1 = paddle.fluid.layers.accuracy(input=prediction, label=y, k=1)
    avg_cost = paddle.fluid.layers.mean(x=cost)
    return train_dataset, test_dataset, x, y, avg_cost, acc_top1

采用GPU多机多卡进行同步训练

collective_trainer.py

import os
import paddle
import paddle.distributed.fleet as fleet
from model import mnist_on_mlp_model

train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
train_dataloader = paddle.io.DataLoader(
    train_data, feed_list=[x, y], drop_last=True,
    places=place, batch_size=64, shuffle=True)
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
#optimizer = paddle.optimizer.Adam(learning_rate=0.01)
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(cost)

exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())

epoch = 10
step = 0
for i in range(epoch):
    for data in train_dataloader():
        step += 1
        loss_val, acc_val = exe.run(
          paddle.static.default_main_program(),
          feed=data, fetch_list=[cost.name, acc.name])
  • 单机四卡训练启动命令

fleetrun --gpus 0,1,2,3 collective_trainer.py

采用参数服务器进行多机训练

parameter_server_trainer.py

import paddle
import paddle.distributed.fleet as fleet
from model import mnist_on_mlp_model

paddle.enable_static()

train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()

fleet.init()
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)

if fleet.is_server():
   fleet.init_server()
   fleet.run_server()
else:
   place = paddle.CPUPlace()
   exe = paddle.static.Executor(place)
   exe.run(paddle.static.default_startup_program())
   fleet.init_worker()

   train_dataloader = paddle.io.DataLoader(
      train_data, feed_list=[x, y], drop_last=True, places=place,
      batch_size=64, shuffle=True)

   epoch = 1
   for i in range(epoch):
      for data in train_dataloader():
         cost_val, acc_val = exe.run(
            paddle.static.default_main_program(),
            feed=data, fetch_list=[cost.name, acc.name])
         print("loss: {}, acc: {}".format(cost_val, acc_val))
   fleet.stop_worker()
  • 两节点Server,两节点Worker的启动命令

fleetrun --worker_num 2 --server_num 2 parameter_server_trainer.py

动态图分布式训练快速开始

Paddle官方文档中对动态图(命令式编程)做了比较详细的介绍。Paddle的分布式高级API paddle.distributed.fleet接口从Paddle 2.0-RC版本开始支持动态图分布式任务执行。本篇文章我们将介绍如何使用 paddle.distributed.fleet接口进行动态图分布式训练。接下来我们以一个简单全连接网络实例为例,说明如何将单机单卡训练改成分布式单机多卡训练,再到多机多卡训练。

注:目前paddle.distributed.fleet启动动态图分布式训练仅支持集合通信(Colletive Communication)模式,不支持参数服务器(Parameter-Server)模式。本文示例为集合通信模式任务。

版本要求

  • paddlepaddle 2.0-rc-gpu版本及以上

单机单卡训练

下面是一个非常简单的动态图单机单卡程序。网络只有只有2层全连接层,用均方差误差(MSELoss)作为损失函数,Adam优化器进行参数的更新。用随机产生的数据进行训练,循环迭代20轮中,每轮打印出当前网络具体的损失值。

# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn

# 定义全连接网络,需继承自nn.Layer
class LinearNet(nn.Layer):
    def __init__(self):
        super(LinearNet, self).__init__()
        self._linear1 = nn.Linear(10, 10)
        self._linear2 = nn.Linear(10, 1)

    def forward(self, x):
        return self._linear2(self._linear1(x))


# 1.开启动态图模式
paddle.disable_static()

# 2. 定义网络对象,损失函数和优化器
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())


for step in range(20):
    # 3. 执行前向网络
    inputs = paddle.randn([10, 10], 'float32')
    outputs = layer(inputs)
    labels = paddle.randn([10, 1], 'float32')
    loss = loss_fn(outputs, labels)

    print("step:{}\tloss:{}".format(step, loss.numpy()))

    # 4. 执行反向计算和参数更新
    loss.backward()
    adam.step()
    adam.clear_grad()

将以上代码保存为train_single.py,运行python train_single.py,您将看到显示如下日志信息:

step:0  loss:[1.2709768]
step:1  loss:[0.7705929]
step:2  loss:[2.2044802]
step:3  loss:[1.6021421]
step:4  loss:[2.0286825]
step:5  loss:[0.7866151]
step:6  loss:[1.926115]
step:7  loss:[0.3647427]
...

单机多卡训练

使用Fleet接口进行动态图分布式训练其实非常的简单,只需修改3个步骤:

  1. 导入paddle.distributed.fleet

from paddle.distributed import fleet
  1. 初始化fleet环境

fleet.init(is_collective=True)
  1. 通过fleet获取分布式优化器和分布式模型

strategy = fleet.DistributedStrategy()
adam = fleet.distributed_optimizer(adam, strategy=strategy)
dp_layer = fleet.distributed_model(layer)

说明:目前静态图DistributedStrategy下的分布式策略正逐步向动态图场景迁移中,敬请期待!

根据我们最开始提供的单机单卡代码示例,再根据3步口诀进行修改,完整的单机多卡示例代码如下:

# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
#分布式step 1: 导入paddle.distributed.fleet包
from paddle.distributed import fleet

# 定义全连接网络,需继承自nn.Layer
class LinearNet(nn.Layer):
    def __init__(self):
        super(LinearNet, self).__init__()
        self._linear1 = nn.Linear(10, 10)
        self._linear2 = nn.Linear(10, 1)

    def forward(self, x):
        return self._linear2(self._linear1(x))


# 1.开启动态图模式
paddle.disable_static()

# 分布式step 2: 初始化fleet
fleet.init(is_collective=True)

# 2. 定义网络对象,损失函数和优化器
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
    learning_rate=0.001, parameters=layer.parameters())

# 分布式step 3: 通过fleet获取分布式优化器和分布式模型
strategy = fleet.DistributedStrategy()
adam = fleet.distributed_optimizer(adam, strategy=strategy)
dp_layer = fleet.distributed_model(layer)


for step in range(20):
    # 3. 执行前向网络
    inputs = paddle.randn([10, 10], 'float32')
    outputs = dp_layer(inputs)
    labels = paddle.randn([10, 1], 'float32')
    loss = loss_fn(outputs, labels)

    print("step:{}\tloss:{}".format(step, loss.numpy()))

    # 4. 执行反向计算和参数更新
    loss.backward()

    adam.step()
    adam.clear_grad()

将以上代码保存为train_fleet.py,假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 dygraph_fleet.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
ips: 127.0.0.1
log_dir: log
server_num: None
servers:
training_script: dygraph_fleet.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 2020-0X-XX 08:33:30,247 launch.py:441] Run collective gpu mode. gpu arguments:['--gpus'], cuda count:8
INFO 2020-0X-XX 08:33:30,247 launch_utils.py:430] Local start 2 processes. First process distributed environment info (Only For Debug):
   +=======================================================================================+
   |                        Distributed Envs                      Value                    |
   +---------------------------------------------------------------------------------------+
   |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:59664               |
   |                     PADDLE_TRAINERS_NUM                        2                      |
   |                     FLAGS_selected_gpus                        0                      |
   |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:59664,127.0.0.1:48993       |
   |                       PADDLE_TRAINER_ID                        0                      |
   +=======================================================================================+
step:0  loss:[1.3279431]
step:1  loss:[2.5023699]
step:2  loss:[3.3197324]
step:3  loss:[2.6869867]
step:4  loss:[2.6306524]
step:5  loss:[1.9267073]
step:6  loss:[1.2037501]
step:7  loss:[1.1434236]
...

完整2卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

多机多卡训练

从单机多卡到多机多卡训练,在代码上并不需要做任何改动,只需修改启动命令,以2机4卡为例:

fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus=0,1 dygraph_fleet.py

在2台机器上分别运行以上启动命令,fleetrun将在后台分别启动2个多进程任务,执行分布式多机训练。 您将在ip为xx.xx.xx.xx的机器上看到命令台输出日志信息:

-----------  Configuration Arguments -----------
gpus: None
ips: xx.xx.xx.xx,yy.yy.yy.yy
log_dir: log
server_num: None
servers:
training_script: dygraph_fleet.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 2020-0X-XX 21:29:41,918 launch.py:434] Run collective gpu mode. gpu arguments:['--ips'], cuda count:2
INFO 2020-0X-XX 21:29:41,919 launch_utils.py:426] Local start 2 processes. First process distributed environment info (Only For Debug):
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT               xx.xx.xx.xx:6070              |
    |                     PADDLE_TRAINERS_NUM                        4                      |
    |                     FLAGS_selected_gpus                        0                      |
    |                PADDLE_TRAINER_ENDPOINTS  ... :6071,yy.yy.yy.yy:6070,yy.yy.yy.yy:6071|
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
step:0  loss:[5.2519045]
step:1  loss:[3.139771]
step:2  loss:[2.0075738]
step:3  loss:[1.4674551]
step:4  loss:[4.0751777]
step:5  loss:[2.6568782]
step:6  loss:[1.1998112]
...

同样完整的日志信息也分别在xx.xx.xx.xx机器和yy.yy.yy.yy机器上的./log/目录下查看。

小结

至此,相信您已经通过3步口诀掌握了如何将一个普通的paddle动态图单卡任务转换为多卡任务。推荐使用单卡进行调试,真正执行训练时切换为多卡任务。我们也将在未来继续完善Fleet动态图模块,通过与静态图类似的方式实现分布式训练任务在不同场景下的优化,敬请期待!

使用fleetrun启动分布式任务

Paddle提供命令行启动命令fleetrun,配合Paddle的分布式高级APIpaddle.distributed.fleet 即可轻松启动Paddle集合通信模式或参数服务器模式下的分布式任务。 fleetrun在静态图和动态图场景下均可使用。

使用要求

使用fleetrun命令的要求:

  • 安装 paddlepaddle 2.0-rc 及以上

使用说明

fleetrun使用场景主要分为集合通信训练(Collective Training)和参数服务器训练(Parameter Server Training)。 集合通信训练一般在GPU设备上运行,因此我们将介绍GPU单机单卡,单机多卡和多机多卡场景下使用fleetrun的方法。 参数服务器训练包含服务节点、训练节点以及异构训练节点的启动, 因此我们将介绍在CPU集群、GPU集群上和异构集群上如何使用fleetrun启动分布式训练 。fleetrun支持在百度公司内部云PaddleCloud上运行分布式任务,推荐结合fleetsub命令,一键快速提交集群任务。详情请参考使用fleetsub提交集群任务

你也可以使用 python -m paddle.distributed.launch 来启动训练任务,事实上, fleetrun是前者的快捷方式。

集合通信训练

  • GPU单机单卡训练

单机单卡有两种方式:一种可直接使用python执行,也可以使用fleetrun执行。推荐使用fleetrun启动方法。

【方法一】直接使用python执行

export CUDA_VISIBLE_DEVICES=0
python train.py

【方法二】使用fleetrun执行

fleetrun --gpus=0 train.py

注:如果指定了export CUDA_VISIBLE_DEVICES=0 ,则可以直接使用:

export CUDA_VISIBLE_DEVICES=0
fleetrun train.py
  • GPU单机多卡训练

若启动单机4卡的任务,只需通过--gpus指定空闲的4张卡即可。

fleetrun --gpus=0,1,2,3 train.py

注:如果指定了export CUDA_VISIBLE_DEVICES=0,1,2,3,则可以直接使用:

export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun train.py
  • GPU多机多卡训练

[示例一] 2机8卡 (每个节点4卡)

fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus=0,1,2,3 train.py

注:如果每台机器均指定了export CUDA_VISIBLE_DEVICES=0,1,2,3,则可以直接在每台节点上启动:

export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" train.py

[示例二] 2机16卡(每个节点8卡,假设每台机器均有8卡可使用)

fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" train.py

参数服务器训练

在CPU集群运行参数服务器
  • 参数服务器训练 - 单机模拟分布式训练

1台机器通过多进程模拟分布式训练,1个服务节点搭配4个训练节点。

fleetrun启动时只需指定服务节点数--server_num和训练节点数--worker_num,即可进行单机模拟分布式训练,推荐使用此方法进行本地调试

fleetrun --server_num=1 --worker_num=4 train.py
  • 参数服务器训练 - 自定义多机训练

fleetrun启动时只需指定服务节点的ip和端口列表--servers 和训练节点的ip列表--workers ,即可进行多机训练。 下列示例中,xx.xx.xx.xx代表机器1,yy.yy.yy.yy代表机器2,6170代表用户指定的服务节点的端口。fleetrun将分别在2台机器上启动1个服务节点,4个训练节点。

# 2个servers 8个workers
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx,xx.xx.xx.xx,yy.yy.yy.yy,yy.yy.yy.yy,yy.yy.yy.yy,yy.yy.yy.yy" train.py

--workers参数可以仅指定ip列表,此时fleetrun将会在启动训练任务前分配好连续端口给每个训练节点。fleetrun分配的连续端口可能会出现端口被其他任务占用的情况,此时多机训练无法正常启动。因此--workers参数支持配置用户指定端口,写法与--servers一致,示例如下:

# 2个servers 8个workers
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,xx.xx.xx.xx:6173,xx.xx.xx.xx:6174,xx.xx.xx.xx:6175,yy.yy.yy.yy:6176,yy.yy.yy.yy:6177,yy.yy.yy.yy:6178,yy.yy.yy.yy:6179" train.py
在GPU集群运行参数服务器
  • 参数服务器训练 - 单机模拟分布式训练

1台机器通过多进程模拟,2个服务节点搭配4个训练节点,每个训练节点占用一张GPU卡,服务节点不占用GPU卡。

# 2个server 4个worker
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --server_num=2 --worker_num=4 train.py

1台机器通过多进程模拟, 2个服务节点搭配2个训练节点,两个训练节点共用一张GPU卡,服务节点不占用GPU卡。

# 2个server 2个worker
export CUDA_VISIBLE_DEVICES=0
fleetrun --server_num=2 --worker_num=2 train.py
  • 参数服务器训练 - 自定义多机训练

fleetrun启动时只需指定服务节点的ip和端口列表--servers 和 训练节点的ip和端口列表--workers ,即可进行多机训练。

以下示例中,xx.xx.xx.xx代表机器1,yy.yy.yy.yy代表机器2,6170代表用户指定的服务节点的端口。fleetrun将分别在2台机器上启动1个服务节点,1个训练节点。训练节点会分别占用其机器上的0号GPU卡进行训练。

# 2台机器,每台机器均有1个服务节点,1个训练节点
# 2个server 2个worker
# 每台机器均指定了可用设备 GPU:0
export CUDA_VISIBLE_DEVICES=0
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,yy.yy.yy.yy:6173" train.py

以下示例中,fleetrun将分别在2台机器上启动1个服务节点,4个训练节点。训练节点会分别占用其机器上的0,1,2,3号GPU卡进行训练。

# 2台机器,每台机器均有1个服务节点,4个训练节点
# 2个server 4个worker
# 每台机器均指定了可用设备 GPU:0,1,2,3
export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun --servers="xx.xx.xx.xx:6170,yy.yy.yy.yy:6171" --workers="xx.xx.xx.xx:6172,xx.xx.xx.xx:6173,xx.xx.xx.xx:6174,xx.xx.xx.xx:6175,yy.yy.yy.yy:6176,yy.yy.yy.yy:6177,yy.yy.yy.yy:6178,yy.yy.yy.yy:6179" train.py
异构集群运行参数服务器
  • 参数服务器训练 - 单机模拟分布式训练

1台机器通过多进程模拟,2个服务节点搭配2个训练节点以及2个异构训练节点,每个异构训练节点占用一张GPU卡,其余服务节点和训练节点均在CPU上执行。

# 2个server 4个worker
export CUDA_VISIBLE_DEVICES=0,1
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 train.py

fleetrun命令参数介绍

  • Collective模式相关参数:

    • ips (str,可选): 指定选择哪些节点IP进行训练,默认为『127.0.0.1』, 即会在本地执行单机单卡或多卡训练。

    • gpus(str, 可选):指定选择哪些GPU卡进行训练,默认为None,即会选择 CUDA_VISIBLE_DEVICES所显示的所有卡。不设置 nproc_per_node参数时,将启动GPU个数个进程进行训练,每个进程绑定一个GPU卡。

    • nproc_per_node (int, 可选):设置多少个进程进行训练。设置数目需要少于或者等于参与训练的GPU的个数以便每个进程可以绑定一个或者平均个数的GPU卡;不能使用GPU训练时,会启动相应个数的CPU进程进行Collective训练。

  • 参数服务器模式可配参数:

    • server_num(int,可选):单机模拟分布式任务中,指定参数服务器服务节点的个数

    • worker_num(int,可选):单机模拟分布式任务中,指定参数服务器训练节点的个数

    • heter_worker_num(int,可选):在异构集群中启动单机模拟分布式任务,指定参数服务器异构训练节点的个数

    • servers(str, 可选): 多机分布式任务中,指定参数服务器服务节点的IP和端口

    • workers(str, 可选): 多机分布式任务中,指定参数服务器训练节点的IP和端口,也可只指定IP

    • heter_workers(str, 可选): 在异构集群中启动分布式任务,指定参数服务器异构训练节点的IP和端口

    • http_port(int, 可选):参数服务器模式中,用Gloo启动时设置的连接端口

  • 其他:

    • log_dir(str, 可选): 指定分布式任务训练日志的保存路径,默认保存在“./log/”目录。

使用fleetrun进行GPU多卡训练实例

下面我们将通过例子,为您详细介绍如何利用fleetrun将单机单卡训练任务转换为单机多卡训练任务。 这里使用与静态图分布式训练快速开始 相同的示例代码进行说明。

import os
import time
import paddle
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
import paddle.fluid as fluid

def mnist_on_mlp_model():
    train_dataset = paddle.vision.datasets.MNIST(mode='train')
    test_dataset = paddle.vision.datasets.MNIST(mode='test')
    x = paddle.data(name="x", shape=[64, 1, 28, 28], dtype='float32')
    y = paddle.data(name="y", shape=[64, 1], dtype='int64')
    x_flatten = fluid.layers.reshape(x, [64, 784])
    fc_1 = nn.fc(input=x_flatten, size=128, act='tanh')
    fc_2 = nn.fc(input=fc_1, size=128, act='tanh')
    prediction = nn.fc(input=[fc_2], size=10, act='softmax')
    cost = fluid.layers.cross_entropy(input=prediction, label=y)
    acc_top1 = fluid.layers.accuracy(input=prediction, label=y, k=1)
    avg_cost = fluid.layers.mean(x=cost)
    return train_dataset, test_dataset, x, y, avg_cost, acc_top1

paddle.enable_static()
train_data, test_data, x, y, cost, acc = mnist_on_mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
train_dataloader = paddle.io.DataLoader(
    train_data, feed_list=[x, y], drop_last=True,
    places=place, batch_size=64, shuffle=True)
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
#optimizer = paddle.optimizer.Adam(learning_rate=0.01)
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(cost)

exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())

epoch = 10
for i in range(epoch):
    total_time = 0
    step = 0
    for data in train_dataloader():
        step += 1
        start_time = time.time()
        loss_val, acc_val = exe.run(
          paddle.static.default_main_program(),
          feed=data, fetch_list=[cost.name, acc.name])
        if step % 200 == 0:
            end_time = time.time()
            total_time += (end_time - start_time)
            print(
                    "epoch: %d, step:%d, train_loss: %f, total time cost = %f, speed: %f"
                % (i, step, loss_val[0], total_time,
                   1 / (end_time - start_time) ))

单机单卡训练

将上述代码保存在train.py代码中,单机单卡训练十分的简单,只需要:

export CUDA_VISIBLE_DEVICES=0
python train.py

可以看见终端上打印日志信息:

  epoch: 0, step:200, train_loss: 0.424425, total time cost = 0.000947, speed: 1055.967774
  epoch: 0, step:400, train_loss: 0.273742, total time cost = 0.001725, speed: 1285.413423
  epoch: 0, step:600, train_loss: 0.472131, total time cost = 0.002467, speed: 1347.784062
  epoch: 0, step:800, train_loss: 0.445613, total time cost = 0.003184, speed: 1394.382979
  epoch: 1, step:200, train_loss: 0.512807, total time cost = 0.000681, speed: 1468.593838
  epoch: 1, step:400, train_loss: 0.571385, total time cost = 0.001344, speed: 1508.199928
  epoch: 1, step:600, train_loss: 0.617232, total time cost = 0.002034, speed: 1449.310297
  epoch: 1, step:800, train_loss: 0.392537, total time cost = 0.002813, speed: 1283.446756
  epoch: 2, step:200, train_loss: 0.288508, total time cost = 0.000796, speed: 1256.155735
  epoch: 2, step:400, train_loss: 0.448433, total time cost = 0.001531, speed: 1360.461888
  epoch: 2, step:600, train_loss: 0.593330, total time cost = 0.002292, speed: 1314.005013
...

单机多卡训练

从单机单卡训练到单机多卡训练不需要改动train.py代码,只需改一行启动命令:

export CUDA_VISIBLE_DEVICES=0,1,2,3
fleetrun train.py

训练日志可以在终端上查看,也可稍后在./log/目录下查看每个卡的日志。 终端可以看到显示日志如下:

-----------  Configuration Arguments -----------
gpus: 0,1,2,3
ips: 127.0.0.1
log_dir: log
server_num: None
servers:
training_script: train.py
training_script_args: []
worker_num: None
workers:
------------------------------------------------
INFO 202X-0X-0X 06:09:36,185 launch_utils.py:425] Local start 4 processes. First process distributed environment info (Only For Debug):
=======================================================================================
            Distributed Envs              Value
---------------------------------------------------------------------------------------
PADDLE_CURRENT_ENDPOINT                   127.0.0.1:33360
PADDLE_TRAINERS_NUM                       4
FLAGS_selected_gpus                       0
PADDLE_TRAINER_ENDPOINTS                  ... 0.1:11330,127.0.0.1:54803,127.0.0.1:49294
PADDLE_TRAINER_ID                         0
=======================================================================================
 epoch: 0, step:200, train_loss: 0.306129, total time cost = 0.001170, speed: 854.759323
 epoch: 0, step:400, train_loss: 0.287594, total time cost = 0.002226, speed: 947.009257
 epoch: 0, step:600, train_loss: 0.179934, total time cost = 0.003201, speed: 1025.752996
 epoch: 0, step:800, train_loss: 0.137214, total time cost = 0.005004, speed: 554.582044
 epoch: 1, step:200, train_loss: 0.302534, total time cost = 0.000975, speed: 1025.752996
 epoch: 1, step:400, train_loss: 0.375780, total time cost = 0.001934, speed: 1042.581158
 epoch: 1, step:600, train_loss: 0.247651, total time cost = 0.002892, speed: 1043.878547
 epoch: 1, step:800, train_loss: 0.086278, total time cost = 0.003845, speed: 1049.363022
.....

FleetX快速开始

FleetX是什么?

FleetX提供效率最高的分布式模型预训练功能,它可以作为paddle.distributed.fleet的扩展进行配合使用。

提供哪些功能?

  • 短代码定义预训练模型

  • 预置经典模型的公开训练数据

  • 用户可低成本替换自有数据集

  • 面向每个模型的最佳分布式训练实践

上手示例

以下通过图像分类Resnet50的例子,说明如何使用FleetX的接口进行分布式训练。具体步骤如下:

  • 导入依赖

  • 构建模型

  • 定义分布式策略

  • 开始训练

为了简化模型定义的过程,我们在后面的文档中会尽量使用FleetX封装的高级API,方便用户理解分布式训练的核心内容。

1. 导入依赖

FleetX依赖Paddle 1.8.0及之后的版本。请确认已安装正确的Paddle版本,并按照以下方式导入Paddle 及 FleetX。

import paddle
import paddle.distributed.fleet as fleet
import fleetx as X

2. 构建模型

通过FleetX提供的 X.applications 接口,用户可以使用一行代码加载一些经典的深度模型,如:Resnet50,VGG16,BERT,Transformer等。同时,用户可以使用一行代码加载特定格式的数据,如对于图像分类任务,用户可以加载ImageNet格式的数据。

paddle.enable_static()
configs = X.parse_train_configs()

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(fs_yml="https://xxx.xx.xx.xx/full_imagenet_bos.yml", local_path='./data')
loader = model.get_train_dataloader(local_path, batch_size=32)

3. 定义分布式策略

在定义完单机训练网络后,用户可以使用paddle.distributed.fleet.DistributedStrategy()接口定义分布式策略,将模型转成分布式模式。

# 使用paddle.distributed.fleet进行collective training
fleet.init(is_collective=True)
# 定义DistributedStrategy
dist_strategy = fleet.DistributedStrategy()
# 装饰单机optimizer为分布式optimizer
optimizer = paddle.distributed.fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

4. 开始训练

可以使用FleetX内置的训练器进行快速训练,方便算法工程师快速上手:

trainer = X.MultiGPUTrainer()
trainer.fit(model, loader, epoch=10)

用户也可以采用Paddle原生的API进行训练流程的定义,代码如下:

exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for epoch_id in range(5):
    step_id = 0
    for data in loader:
        cost_val = exe.run(paddle.default_main_program(),
                   feed=data,
                   fetch_list=[model.loss.name])
        if step_id % 100 == 0:
            print("worker index: %d, epoch: %d, step: %d, train loss: %f"
                 % (fleet.worker_index(), epoch_id, step_id, cost_val[0]))

从Paddle 2.0 rc版本开始,我们统一采用fleetrun进行多卡训练的启动,方式如下:

fleetrun --gpus 0,1,2,3 resnet_app.py

关于fleetrun命令,更详细的使用说明请参考fleetrun

使用fleetsub提交集群任务

fleetsub是什么

当您安装了fleet-x后,便可以使用fleetsub在集群上提交分布式任务。长期的目标是成为集群任务提交的统一命令,只需要一行启动命令,就会将训练任务提交到离线训练集群中。目前该功能只支持百度公司内部云上的任务提交,使用fleetsub前需要先安装paddlecloud客户端,后续我们会支持更多的公有云任务提交。

使用要求

使用fleetsub命令的要求:安装fleet-x

  • 【方法一】从pip源安装

pip install fleet-x
  • 【方法二】下载whl包并在本地安装

# python2
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.4-py2-none-any.whl
pip install fleet_x-0.0.4-py2-none-any.whl
# python3
wget --no-check-certificate https://fleet.bj.bcebos.com/fleet_x-0.0.4-py3-none-any.whl
pip3 install fleet_x-0.0.4-py3-none-any.whl

使用说明

在提交任务前,用户需要在yaml文件中配置任务相关的信息,如:节点数、镜像地址、集群信息、启动训练所需要的命令等。

首先看一个yaml文件的样例。因为信息安全的原因yaml文件中的信息做了脱敏。

num_trainers: 4
num_cards: 8
job_prefix: bert_base_pretraining
image_addr: ${image_addr:-"dockhub.com/paddlepaddle-public/paddle_ubuntu1604:cuda10.0-cudnn7-dev"}
cluster_name: v100-32-cluster
group_name: k8s-gpu-v100-8
server: paddlecloud.server.com
log_fs_name: "afs://xx.fs.com:9902"
log_fs_ugi: "ugi_name,ugi_passwd"
log_output_path: "/xx/yy/zz"
file_dir: "./"

whl_install_commands:
  - pip install fleet_x-0.0.5-py2-none-any.whl
  - pip install paddlepaddle_gpu-0.0.0-cp27-cp27mu-linux_x86_64.whl

commands:
  - fleetrun bert_base.py --download_config=bert.yaml

字段名称

字段含义

类型

num_trainers

训练节点的数量

INT

num_cards

单节点上申请的GPU卡数

INT

job_prefix

任务名前缀

STRING

image_addr

镜像地址

{STRING}

cluster_name

集群名

STRING

group_name

群组名

STRING

server

集群master节点服务名

STRING

log_fs_name

任务日志存放的文件系统名

STRING

log_fs_ugi

任务日志存放的文件系统UGI

STRING

log_output_path

任务日志存放的目标文件系统地址

STRING

file_dir

提交任务需要上传的文件目录

STRING

whl_install_commands

安装各种wheel包的命令

Repeated Command Line

commands

运行任务执行的各种命令

Repeated Command Line

任务提交

定义完上述脚本后,用户即可使用fleetsub命令向PaddleCloud 提交任务了:

fleetsub -f demo.yaml

使用样例

具体的使用说明及样例代码请参考下面的WIKI

使用Fleet进行参数服务器训练

在大数据浪潮的推动下,有标签训练数据的规模取得了飞速的增长。现在人们通常用数百万甚至上千万的有标签图像来训练图像分类器(如,ImageNet包含1400万幅图像,涵盖两万多个种类),用成千上万小时的语音数据来训练语音模型(如,Deep Speech 2系统使用了11940小时的语音数据以及超过200万句表述来训练语音识别模型)。在真实的业务场景中,训练数据的规模可以达到上述数据集的数十倍甚至数百倍,如此庞大的数据需要消耗大量的计算资源和训练时间使模型达到收敛状态(数天时间)。

为了提高模型的训练效率,分布式训练应运而生,其中基于参数服务器的分布式训练为一种常见的中心化共享参数的同步方式。与单机训练不同的是在参数服务器分布式训练中,各个节点充当着不同的角色:

  • 训练节点:该节点负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至服务节点。

  • 服务节点:在收到所有训练节点传来的梯度后,该节点会将梯度聚合并更新参数。最后将参数发送给训练节点,开始新一轮的训练。

根据参数更新的方式不同,可以分为同步和异步两种:

  • 同步训练:在同步参数服务器分布式训练中,所有训练节点的进度保持一致。每训练完一个Batch后,训练节点会上传梯度,然后开始等待服务节点返回更新后的参数。服务节点拿到所有训练节点上传的梯度后,才会对参数进行更新。因此,在任何一个时间点,所有训练节点都处于相同的训练阶段。

  • 异步训练:与同步训练不同,在异步训练中任何两个训练节点之间的参数更新都互不影响。每一个训练节点完成训练、上传梯度后,服务节点都会立即更新参数并将结果返回至相应的训练节点。拿到最新的参数后,该训练节点会立即开始新一轮的训练。

下面我们将通过例子,为您介绍同步/异步训练在Fleet中的实现。

在开始之前我们首先需要下载训练中所需要的数据:

# 下载并解压数据,训练数据讲保存至名为 raw_data 的文件夹
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz

实用样例

下面我们来介绍如何用Fleet接口,完成参数服务器分布式训练(假设训练脚本为ctr_app.py)。

导入依赖

import paddle
import os
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

定义分布式模式并初始化

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模型,init()接口默认使用参数服务器模式,所以用户不需要定 义任何参数。

paddle.enable_static()
configs = X.parse_train_configs()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)

加载模型及数据

用户可以通过X.applications接口加载我们预先定义好的模型。在这个例子中我们将使用CTR-DNN模型,同时用户可以为模型定制的data_loader接口加载数据.

model = X.applications.MultiSlotCTR()
loader = model.load_criteo_from_file('./train_data')

定义同步训练 Strategy 及 Optimizer

在Fleet API中,用户可以使用fleet.DistributedStrategy()接口定义自己想要使用的分布式策略。

其中a_sync选项用于定义参数服务器相关的策略,当其被设定为False时,分布式训练将在同步的模式下进行。反之,当其被设定成True时,分布式训练将在异步的模式下进行。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = False

optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

开始训练

完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。

对于服务器节点,首先用init_server()接口对其进行初始化,然后启动服务并开始监听由训练节点传来的梯度。

同样对于训练节点,用init_worker()接口进行初始化后, 开始执行训练任务。运行X.Trainer.fit接口开始训练,并得到训练中每一步的损失值。

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    fleet.init_worker()
    trainer = X.Trainer(fluid.CPUPlace())
    trainer.fit(model, loader, epoch=10)

运行训练脚本

定义完训练脚本后,我们就可以用fleetrun指令运行分布式任务了。其中server_num, worker_num分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。

fleetrun --server_num=1 --worker_num=2 ctr_app.py

使用InMemoryDataset/QueueDataset进行训练

注意

本教程目前不支持动态图,仅支持在paddle静态图模式下使用,paddle开启静态图模式

paddle.enable_static()

简介

为了能高速运行模型的训练,我们使用InMemoryDataset/QueueDatasetAPI进行高性能的IO,具体介绍可以参考文档InMemoryDatasetQueueDataset, 以下简称Dataset。Dataset是为多线程及全异步方式量身打造的数据读取方式,每个数据读取线程会与一个训练线程耦合,形成了多生产者-多消费者的模式,会极大的加速我们的模型训练。

本文以训练word2vector模型为例,在训练中引入基于Dataset API读取训练数据的方式,我们直接加载Fleetx预先定义好的word2vector模型,省去一切前期组网调试阶段,无需变更数据格式,只需在我们原本的训练代码中加入以下内容,便可轻松使用Dataset接口来进行训练。以下是使用Dataset接口一个比较完整的流程:

引入dataset

  1. 通过dataset = paddle.distributed.InMemoryDataset() 或者 dataset = paddle.distributed.QueueDataset()创建一个Dataset对象

  2. 指定dataset读取的训练文件的列表, 通过set_filelist配置。

  3. 通过dataset.init() api 进行Dataset的初始化配置,init()接口接收**kwargs参数, 详见api文档,列举几个配置的初始化

    1. 将我们定义好的数据输入格式传给Dataset, 通过use_var配置。

    2. 指定我们的数据读取方式,由my_data_generator.py实现数据读取的规则,后面将会介绍读取规则的实现, 通过pipe_command配置。pipe_command是Dataset特有的通过管道来读取训练样本的方式,通过set_filelist设置的训练样本文件将被作为管道的输入cat到管道中经过用户自定义的pipe_command最终输出。

    3. 指定数据读取的batch_size,通过batch_size配置。

    4. 指定数据读取的线程数,一般该线程数和训练线程应保持一致,两者为耦合的关系,通过thread_num配置。

dataset = paddle.distributed.InMemoryDataset()
batch_size = config.config["batch_size"]
thread_num = config.config["thread_num"]
dataset.init(use_var=model.inputs, pipe_command="python my_data_generator.py", batch_size=batch_size, thread_num=thread_num)
dataset.set_filelist([config.config["train_files_path"]])

如何指定数据读取规则

在上文我们提到了由my_data_generator.py实现具体的数据管道读取规则,那么,怎样为dataset创建数据读取的规则呢? 以下是my_data_generator.py的全部代码,具体流程如下: 1. 首先我们需要引入data_generator的类,位于paddle.distributed.fleet.data_generator。 2. 声明一些在数据读取中会用到的类和库,如示例代码中的NumpyRandomIntlogger等。 3. 创建一个子类Word2VecReader,继承fleet.data_generator的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用MultiSlotDataGenerator;若已经完成了预处理并保存为数据文件,可以直接以string的方式进行读取,使用MultiSlotStringDataGenerator,能够进一步加速。在示例代码,我们继承并实现了名为Word2VecReader的data_generator子类,使用MultiSlotDataGenerator方法。 4. 继承并实现基类中的generate_sample函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) 5. 在这个可以迭代的函数中,如示例代码中的def nce_reader(),我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。

  1. 最后,我们需要将数据整理为特定的batch的格式,才能够被dataset正确读取,并灌入的训练的网络中。继承并实现基类中的generate_batch函数, 根据设定的’batch_size’, 该函数会在generator_sample函数产生样本数达到batch_size时,调用该函数内队逐条样本的处理逻辑,如示例代码中的def local_iter()

  2. 简单来说,数据的输出顺序与我们在网络中创建的inputs必须是严格一一对应的,并转换为类似字典的形式。在示例代码中,我们将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如[('input_word',[value]),('true_label',[value]),('neg_label',[value])]

import sys
import io
import os
import re
import collections
import time
import config
import logging

import paddle
import numpy as np
import paddle.distributed.fleet as fleet

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)

class NumpyRandomInt(object):
    def __init__(self, a, b, buf_size=1000):
        self.idx = 0
        self.buffer = np.random.random_integers(a, b, buf_size)
        self.a = a
        self.b = b

    def __call__(self):
        if self.idx == len(self.buffer):
            self.buffer = np.random.random_integers(self.a, self.b,
                                                    len(self.buffer))
            self.idx = 0

        result = self.buffer[self.idx]
        self.idx += 1
        return result

class Word2VecReader(fleet.MultiSlotDataGenerator):
    def init(self,
             dict_path,
             nce_num,
             window_size=5):

        self.window_size_ = window_size
        self.nce_num = nce_num

        word_all_count = 0
        id_counts = []
        word_id = 0

        with io.open(dict_path, 'r', encoding='utf-8') as f:
            for line in f:
                word, count = line.split()[0], int(line.split()[1])
                word_id += 1
                id_counts.append(count)
                word_all_count += count

        self.word_all_count = word_all_count
        self.corpus_size_ = word_all_count
        self.dict_size = len(id_counts)
        self.id_counts_ = id_counts

        logger.info("corpus_size:", self.corpus_size_)
        self.id_frequencys = [
            float(count) / word_all_count for count in self.id_counts_
        ]
        logger.info("dict_size = " + str(self.dict_size) + " word_all_count = " + str(word_all_count))

        self.random_generator = NumpyRandomInt(1, self.window_size_ + 1)

    def get_context_words(self, words, idx):
        """
        Get the context word list of target word.
        words: the words of the current line
        idx: input word index
        window_size: window size
        """
        target_window = self.random_generator()
        start_point = idx - target_window  # if (idx - target_window) > 0 else 0
        if start_point < 0:
            start_point = 0
        end_point = idx + target_window
        targets = words[start_point:idx] + words[idx + 1:end_point + 1]
        return targets

    def generate_batch(self, samples):
        def local_iter():
            np_power = np.power(np.array(self.id_frequencys), 0.75)
            id_frequencys_pow = np_power / np_power.sum()
            cs = np.array(id_frequencys_pow).cumsum()
            result = [[], []]
            for sample in samples:
                tensor_result = [("input_word", []), ("true_label", []), ("neg_label", [])]
                tensor_result[0][1].extend(sample[0])
                tensor_result[1][1].extend(sample[1])
                neg_array = cs.searchsorted(np.random.sample(self.nce_num))

                tensor_result[2][1].extend(neg_array)

                yield tensor_result
        return local_iter



    def generate_sample(self, line):
        def nce_reader():

            word_ids = [int(w) for w in line.split()]
            for idx, target_id in enumerate(word_ids):
                context_word_ids = self.get_context_words(
                    word_ids, idx)
                for context_id in context_word_ids:
                    yield [target_id], [context_id]
        return nce_reader

if __name__ == "__main__":
    my_data_generator = Word2VecReader()
    my_data_generator.init(config.config["dict_path"], config.config["nce_num"])
    my_data_generator.set_batch(config.config["batch_size"])

    my_data_generator.run_from_stdin()

快速调试Dataset

我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令 cat 数据文件 | python dataset读取python文件进行dataset代码的调试:

cat train_data/part_912 | python my_data_generator.py

输出的数据格式如下: input_word:size ; input_word:value ; true_label:size ; true_label:value ; neg_label:size ; neg_label:value

理想的输出为(截取了一个片段):

...
1 112 1 2739 5 6740 451 778 90446 3698
...

使用Dataset的一些注意事项 - Dataset的基本原理:将数据print到缓存,再由C++端的代码实现读取,因此,我们不能在dataset的读取代码中,加入与数据读取无关的print信息,会导致C++端拿到错误的数据信息。 - dataset目前只支持在unbuntuCentOS等标准Linux环境下使用,在WindowsMac下使用时,会产生预料之外的错误,请知悉。

数据准备

可以参考文档 的数据准备部分 完整数据下载以及预处理之后可以选取一个part的文件作为demo数据

mkdir demo_train_data
cp train_data/part_1 demo_train_data/

训练

我们把原来的训练代码:

trainer = X.CPUTrainer()
trainer.fit(model, loader, epoch=10)

替换成如下使用Dataset训练的流程, 我们以一个epoch为例:

import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import config
# 开启paddle静态图模式
paddle.enable_static()

fleet.init()

model = X.applications.Word2vec()

"""
need config loader correctly.
"""

loader = model.load_dataset_from_file(train_files_path=[config.config["train_files_path"]], dict_path=config.config["dict_path"])

dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True

optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    place = paddle.CPUPlace()
    fleet.init_worker()
    exe = paddle.static.Executor(place)
    default_startup_program = paddle.static.Program()
    default_main_program = paddle.static.Program()
    scope1 = fluid.Scope()
    with fluid.scope_guard(scope1):
        exe.run(model.startup_prog)

    dataset = paddle.distributed.QueueDataset()
    batch_size = config.config["batch_size"]
    thread_num = config.config["thread_num"]
    dataset.init(use_var=model.inputs, pipe_command="python my_data_generator.py", batch_size=batch_size, thread_num=thread_num)
    dataset.set_filelist([config.config["train_files_path"]])

    with fluid.scope_guard(scope1):
        exe.train_from_dataset(model.main_prog, dataset, scope1, debug=False, fetch_list=[model.loss], fetch_info=["loss"], print_period=10)

    fleet.stop_worker()

最后添加上述代码使用的配置文件config.py

config = dict()

config["dict_path"] = "thirdparty/test_build_dict"
config["train_files_path"] = "demo_train_data/part_1"
config["batch_size"] = 1000
config["nce_num"] = 5
config["thread_num"] = 12

通过以上简洁的代码,即可以实现word2vector模型的多线程并发训练

Collective 同步训练实践

同步训练简介

许多研究表明深度学习的预训练受益于更多的数据[1] [2] [3],但更大的数据量也意味着更长的训练耗时,数据并行同步训练是一种加速大规模数据训练的方法,有PServerCollective两种模式。

同步训练通过数据划分,将计算工作量(前向、反向)分布到GPU 集群中的每一个worker上, 提高整体计算吞吐。但参数更新(update) 的过程在两种模式中有所不同:

  • PServer模式中,会启动多个pservers 和多个trainers,每个pserver会保存一部分模型参数,并负责接收从trainer发送的梯度并更新这些模型参数;每个trainer 会保存一份完整的模型,并使用一部分数据进行训练,然后向pserver发送梯度,最后从pserver拉取更新后的参数。 pserver进程和trainer可以在不同的计算节点上,也可以在同一公用节点。一个分布式任务所需要的pserver进程个数通常需要根据实际情况调整,以达到最佳的性能,然而通常来说pserver的进程不会比trainer更多。

PServe
  • Collective模式中,集群中只存在多个地位平等的trainers。 每个trainer进程都保存一份完整的模型参数。 前向和反向中每个 trainer 使用自己划分 (shard)的数据进行计算,得到对应的梯度;之后trainers 之间通过 allreduce 等 Collective 通信方式[4] 同步梯度到所有trainers,最后每个 trainer 使用同步后的梯度独立完成参数更新。

Collective

相交于异步训练, 同步训练的的优势在于Loss可以比较稳定的下降,缺点是整体速度的快慢取决于最慢的trainer. 因此在训练较为复杂的模型时,即模型训练过程中神经网络训练耗时远大于节点间通信耗时的场景下,推荐使用同步训练模式。

Fleet中 PServer模式使用 gRPC 通信,Collective模式使用 NCCL2 通信。

下文将由三部分组成:

  • 介绍 Fleet 同步训练中常用的几个策略 、优化

  • 结合上述常用优化,给出一个在 4节点 32 V100 集群 训练 ResNet50的示例代码

  • 完整 Fleet 同步训练参数策略介绍

Fleet Collective 同步训练优化

Fleet 支持在 GPU (CUDA 版本 >= 7.5) 服务器集群上完成高性能分布式训练。 用户可以通过 fleet.DistributedStrategy 设置许多与训练性能策略相关参数。目前Fleet 为这些参数提供了一个较通用默认值,用户可以不去调整。但如果用户希望针对性调优分布式训练的性能,可以根据自身硬件和任务设置对应参数。

在进行性能优化时, 检查每项优化点并验证对应提升,最终获得最优性能。 一个简单的验证当前的训练程序是否需要进一步优化性能的方法, 是查看GPU的计算利用率,通常用 nvidia-smi 命令查看。 如果GPU利用率较低,则可能存在较大的优化空间。

下文将介绍对性能影响较大,设置频率比较高的几个参数,详细的参数列表放在文末的附录中。

注意: 使用NCCL2模式分布式训练时,需要确保每个节点训练等量的数据,防止在最后一轮训练中任务不退出。通常有两种方式:

  • 随机采样一些数据,补全分配到较少数据的节点上。(推荐使用这种方法,以训练完整的数据集)。

  • 在python代码中,每个节点每个pass只训练固定的batch数,如果这个节点数据较多,则不训练这些多出来的数据。

OP融合

将模型网络中顺序执行的多个OPs进行融合能够减少OP 调度的开销,提升训练速度。目前Fleet 中支持如下3种的OP 融合:

  • fuse_all_optimizer_ops:表明是否融合(fuse) 是否融合 optimizer_op,仅对部分 optimizer 可用(SGD、Adam和Momentum)。

  • fuse_elewise_add_act_ops:表明是否融合(fuse) elementwise_add_op和activation_op。

  • fuse_bn_act_ops:表明是否融合(fuse) batch_norm_op 和 activation_op。

通常使用这些策略都会使整体执行过程更快。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.fuse_all_optimizer_ops = True
dist_strategy.fuse_bn_act_ops = True
dist_strategy.fuse_elewise_add_act_ops = True

AllReduce融合

AllReduce 融合默认情况下会将同一layer中参数的梯度的多个AllReduce操作合并成一个。 比如对于 fluid.layers.fc 中有Weight和Bias两个参数,打开该选项之前,需要两次AllReduce操作;打开该选项之后,只用一次AllReduce 操作。这样可以减少梯度同步时的通信耗时。

此外,为支持更大粒度的参数梯度融合,Fleet 提供了以下两个选项,用户可以在训练程序运行前在DistributedStrategy中设置:

  • fuse_grad_size_in_MB: 指定每个AllReduce操作的梯度字节数,如该参数等于16 则每次AllReduce调用传输16MB的梯度。 该参数的经验值为总通信量的十分之一。

  • fuse_grad_size_in_TFLOPS: 指定每次AllReduce操作的最大层数,即到达该层数就进行AllReduce。如该参数等于50, 则最多每50层做一次 fused AllReduce。

注意: AllReduce融合目前不支持sparse参数梯度。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.fuse_grad_size_in_MB=16
dist_strategy.fuse_grad_size_in_TFLOPS=50
dist_strategy.fuse_all_reduce_ops=True

分层 AllReduce

对于多机模式,针对小数据量的通信,Ring AllReduce通信效率低,采用Hierarchical AllReduce可以缓解这一问题。 分层AllReduce 运行如下图所示:

分层 AllReduce
dist_strategy = fleet.DistributedStrategy()
dist_strategy.use_hierarchical_allreduce = True
dist_strategy.hierarchical_allreduce_inter_nranks = 8

使用同步Allreduce

Fleet 使用多进程+NCCL2模式(collective)以获得更好的性能。 在多进程模式下,每台服务器的每个GPU卡都会对应启动一个训练进程, 集群中的所有进程之间会互相通信完成训练。以此方式最大限度的降低进程内部资源抢占的开销。

dist_strategy.sync_nccl_allreduce=True

设置合适的nccl通信器数量

nccl通信器数量 nccl_comm_num 可以加快GPU之间的通信效率,建议单机设置为1,多机设置为2。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.nccl_comm_num = 2

设置合适的CPU线程数

PaddlePaddle Fluid使用“线程池” [5] 模型调度并执行Op,Op在启动GPU计算之前, 通常需要CPU的协助,然而如果Op本身占用时间很小,“线程池”模型下又会带来额外的调度开销。

根据以往的经验,对于CPU任务,num_threads=2 * dev_count 时性能较好,对于GPU任务,num_threads=4 * dev_count 时性能较好。注意:线程池不是越大越好。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.thread_num = 3

提高网络的吞吐

多节点训练时网络的带宽常常成为训练的瓶颈。我们在实测中发现,当使用自动混合精度训练后,TCP socket 的通信方式将成为训练速度的瓶颈, 使多节点训练无法充分利用 FLeet 混合精度计算带来的速度提升。 在我们实测中使用: 100Gb 网卡,RDMA[7]InfiniBand[8]来提升网络带宽,使网络传输不会成为计算速度的瓶颈。 在开始训练前,需要正确设置以下 NCCL 环境变量使对应硬件设置生效:

Env Name

Description

NCCL_SOCKET_IFNAME

The RDMA device, e.g. eth2

NCCL_P2P_DISABLE

Set to 1 to disable P2P transfer between GPUs

NCCL_IB_DISABLE

Set to 1 to disable using RDMA

NCCL_IB_CUDA_SUPPORT

Set to 1 to enable GPU Direct if supported

NCCL_DEBUG

Set debug level: VERSION, WARN, INFO

预先分配足够的显存

通过环境变量 FLAGS_fraction_of_gpu_memory_to_use=0.7 设置预先分配的显存占比。 由于CUDA原生的显存分配cuMalloc和释放cuFree操作均是同步操作,非常耗时,因此 通过 设置 FLAGS_fraction_of_gpu_memory_to_use 成一个较大的值,比如0.7,可以显著地加速训练的速度。

0.7 是指 70%的显存会预先分配。设置的范围是0.0~1.0。

os.environ['FLAGS_fraction_of_gpu_memory_to_use'] = "0.98"

降低scope drop频率和fetch频率

减少scope drop和fetch频率,可以减少频繁的变量内存申请、释放和拷贝, 从而提升性能。

# 每 30 batch 之后清理一次临时变量
dist_strategy = fleet.DistributedStrategy()
dist_strategy.BuildStrategy = {'num_iteration_per_drop_scope': 30}

# 降低fetch频率,每 30 batch fetch 一次训练输出
for pass_id in xrange(PASS_NUM):
    batch_id = 0
    while True:
        if batch_id % 30 == 0:
            fetched = exe.run(fetch_list)
        else:
            exe.run([])

增大batch_size

分布式同步训练,跨节点通信或多或少会带来性能影响,增大训练的batch_size, 可以保持通信开销不变的情况下,增大计算吞吐从而降低通信在整个训练过程中的占比来提升总体的训练吞吐。

使用 DALI reader

数据读取的优化在GPU训练中至关重要,尤其在不断增加batch_size提升吞吐时,数据reader 可能成为训练速度的瓶颈。 Fleet 中可以使用 Nvidia DALI6 作为数据loader. 使用DALI的优点有:

  • 使用GPU完成部分数据预处理,加速数据读取过程,减少 CPU 负担。

  • DALI 提供预取队列(perfetch queue)功能,让数据预处理和模型计算可以异步进行,减少模型计算对数据读取的等待。

import fleetx as X
model = X.applications.Resnet50()
loader = model.load_imagenet_from_file("/pathto/imagenet/train.txt", use_dali=True)

使用混合精度训练

V100 GPU提供了 Tensor Core 可以在混合精度计算 场景极大的提升性能。使用混合精度计算的例子可以参考文档 ` <https://todo/>`__

目前Paddle只提供在两个模型(ResNet, BERT)的混合精度计算实现并支持static loss scaling,其他模型使用混合精度也 可以参考以上的实现完成验证。

ResNet50训练示例

试验开始前我们已经在GPU 集群中提前配置好 RDMAInfiniBand,减少网络通信的瓶颈,配置细节和具体硬件相关,可以参考`[rdma-x] <https://community.mellanox.com/s/article/what-is-rdma-x>`__

设置 AllReduce融合等参数

梯度融合中的16 和 50 是我们根据自身网络硬件和ResNet50 训练试验得出的经验值,用户可以根据自身硬件和模型进行调整。 0.7 是为了给 DALI loader 提前预留显存空间。

import os
os.environ['FLAGS_fuse_parameter_memory_size'] = "16"
os.environ['FLAGS_fuse_parameter_groups_size'] = "50"
os.environ['FLAGS_fraction_of_gpu_memory_to_use'] = "0.7"

添加依赖

import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet

定义分布式模式并初始化模型和reader

这里我们使用DALI reader 减少CPU 数据处理负担和数据读取瓶颈。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)

定义分布式相关策略

这里我们会开启上文中提到的各项训练优化策略,如:自动混合精度计算,OP 融合等。

dist_strategy = fleet.DistributedStrategy()

# distributed strategy
dist_strategy.sync_nccl_allreduce = True
dist_strategy.nccl_comm_num = 2
dist_strategy.fuse_all_reduce_ops = True

# build strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_sequential_execution = True
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
build_strategy.enable_auto_fusion = True
build_strategy.fuse_all_optimizer_ops = True
dist_strategy.build_strategy = build_strategy


# execute strategy
execution_strategy = fluid.ExecutionStrategy()
execution_strategy.num_threads = 3
execution_strategy.num_iteration_per_drop_scope = 100
execution_strategy.num_iteration_per_run = 1
dist_strategy.execution_strategy = execution_strategy

# amp
dist_strategy.amp = True
dist_strategy.amp_configs = {
    "init_loss_scaling": 128,
    "decr_every_n_nan_or_inf": 2,
    "incr_every_n_steps": 1000,
    "incr_ratio": 2.0,
    "use_dynamic_loss_scaling": True,
    "decr_ratio": 0.5,
    "custom_white_list": [],
    "custom_black_list": [],
}

dist_strategy.save_to_prototxt("dist_strategy.prototxt")

开始训练

optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))

Fleetrun 一键启动

"xx.xx.xx.xx” 等是四个节点ips,每个节点 8 张 GPU卡, 共 32 GPU 并行训练。

fleetrun --ips="xx.xx.xx.xx, yy.yy.yy.yy, aa.aa.aa.aa, bb.bb.bb.bb" --gpus=0,1,2,3,4,5,6,7 example_collective.py

# worker_index: 0, step0 cost = 7.147776, speed: 34.481360
# worker_index: 0, step1 cost = 7.151375, speed: 408.405991
# worker_index: 0, step2 cost = 7.025396, speed: 509.624355
# worker_index: 0, step3 cost = 6.501647, speed: 533.641315
# worker_index: 0, step4 cost = 6.759287, speed: 520.999193
# worker_index: 0, step5 cost = 6.266363, speed: 536.729215
# worker_index: 0, step6 cost = 6.243353, speed: 522.510241
# worker_index: 0, step7 cost = 6.923586, speed: 519.478763
# worker_index: 0, step8 cost = 7.607512, speed: 534.919526
# worker_index: 0, step9 cost = 7.111218, speed: 508.371600

Fleet 训练策略

DistributedStrategy

Dist ributedStrategy

类型

默认值

定义

auto

bool

False

自动 化框架参数优化

a_sync

bool

True

指示 是否使用异步SGD 进行参 数更新,仅在PS erver模式中生效

sync _nccl_allreduce

bool

True

指示是 否在每个通信线 程中中使用同步 allre duce,仅在Colle ctive模式中生效 ,通常在使用同 步allreduce后系 统的开销会降低

nccl_comm_num

int

1

nccl通信器数量. nccl通信器数量 nccl_comm_num 可以加快GPU之 间的通信效率, 建议单机设置为 1,多机设置为2 。针对CPU线程数 num_threads ,建议单机设置 为1,多机设置为 nccl_comm_num +1

use_hierarc hical_allreduce

bool

False

分级式allred uce,对于多机模 式,针对小数据 量的通信,Ring AllReduc e通信效率低,采 用Hierarchical AllReduce可 以解决该问题。

hiera rchical_allredu ce_inter_nranks

int

1

在 分级式allreduc e,低层级groups 中的 r ank数。一般等于 单个GPU节点中的 GPU数

sync_batch_norm

bool

False

表示是否使 用同步的批正则 化,即在训练阶 段通过多个设备 同步均值和方差 。当前的实现不 支持FP16训练和C PU。并且目前* 仅支持*仅在 一台机器上进行 同步式批正则。

fuse _all_reduce_ops

bool

True

默认情况下会 将同一layer中参 数的梯度的AllR educe操作合并成 一个,比如对于 fluid.layers.fc 中有 Weight和Bias两 个参数,打开该 选项之后,原本 需要两次AllRed uce操作,现在只 用一次AllReduce 操作。

fuse_ grad_size_in_MB

int

32

每个AllReduce操 作的梯度字节数

fuse_grad _size_in_TFLOPS

int

20

指 定每次AllReduc e操作的最大层数 ,即到达该层数 就进行AllReduce

cudnn_ex haustive_search

bool

True

表示是 否使用穷举搜索 方法来选择卷积 算法。在cuDNN中 有两种搜索方法 ,启发式搜索和 穷举搜索。穷举 搜索尝试所有cu DNN算法以选择其 中最快的算法。 此方法非常耗时 ,所选择的算法 将针对给定的层 规格进行缓存。 一旦更改了 图层规格(如bat ch大小,feature map大小), 它将再次搜索。

conv_works pace_size_limit

int

4000

用 于选择cuDNN卷积 算法的工作区限 制大小(单位为 MB)。cuDNN的内 部函数在这个内 存限制范围内获 得速度最快的匹 配算法。通常, 在较大的工作区 内可以选择更快 的算法,但同时 也会显著增加内 存空间。用户需 要在内存和速度 之间进行权衡。

cudn n_batchnorm_spa tial_persistent

bool

True

表示是否在 batchnorm中使用 新的批量标准化 模式CUDNN_BATC HNORM_SPATIAL_P ERSISTENT函数。

BuildStrategy

BuildStrategy

类型

默认值

定义

enable_seque ntial_execution

bool

False

如果 设置为True,则 算子的执行顺序 将与算子定义的 执行顺序相同。

fuse_elew ise_add_act_ops

bool

False

表明 是否融合(fuse) elementwise_add _op和activation _op。这会使整体 执行过程更快。

fuse_bn_act_ops

bool

False

表明 是否融合(fuse) batch_norm_op 和 activation _op。这会使整体 执行过程更快。

fuse_relu _depthwise_conv

bool

False

表明 是否融合(fuse) relu和 depthwise_conv 2d,节省GPU内存 并可能加速执行 过程。此选项仅 适用于GPU设备。

fus e_broadcast_ops

bool

False

表明 是否融合(fuse) broadcast ops。 该选项指在Reduc e模式下有效,使 程序运行更快。

fuse_al l_optimizer_ops

bool

False

表明 是否融合(fuse) 是否融合 optimiz er_op,仅对部分 optimizer 可用 (SGD、Adam和M omentum),可使 程序运行更快。

enable_inplace

bool

False

表明是 否Op的输出复用O p输入的显存空间 ,优化显存占用

ena ble_backward_op timizer_op_deps

bool

True

在反向操作 和参数更新操作 之间添加依赖, 保证在所有的反 向操作都运行结 束之后才开始运 行参数更新操作. 在 多卡训练时,打 开该选项可能会 提升训练速度。

cache_ runtime_context

bool

False

unkown

ExecutionStrategy

Ex ecutionStrategy

类型

默认值

定义

num_threads

int

1

表示当前 Executor 的线程池(thread pool)的大小, 此线 程池可用来并发 执行program中的 operator(算子 ,运算)。如果 num_threads=1 ,则所有 的operator将一 个接一个地执行 ,但在不同的pro gram重复周期(it erations)中执行 顺序可能不同。

num_iteration _per_drop_scope

int

10

该选项表 示间隔多少次迭 代之后清理一次 临时变量。模型 运行过程中,生 成的中间临时变 量将被放到local execution scope中,为了 避免对临时变量 频繁的申请与释 放,通常将其设 为较大的值(比 如10或者100)。

num_it eration_per_run

int

3

它配置了当用户 在python脚本中 调用pe.run()时 执行器会执行的 迭代次数。Execu tor每次调用,会 进行num_iterat ion_per_run次训 练,它会使整体 执行过程更快。

use _thread_barrier

bool

False

当使用 PServer 模式时为 True

自动混合精度练加速分布式训练

简介

在使用数据并行分布式训练的同时, 我们还可以引入自动混合精度(Auto Mixed Precision) 来进一步提升训练的速度.

主流的神经网络模型通常使用单精度 single-precision (FP32) 数据格式来存储模型参数、进行训练和预测. 在上述环节中使用半精度 half-precision (FP16)来代替单精度. 可以带来以下好处:

  1. 减少对GPU memory 的需求: GPU 显存不变情况下, 支持更大模型 / batch size

  2. 降低显存读写时的带宽压力

  3. 加速GPU 数学运算速度 (需要GPU 支持[1])

  4. GPU上 FP16 吞吐是FP32 的 2 - 8 倍[2]

Paddle 支持自动混合精度计算, 并实现了 自动维护FP32 、FP16参数副本, Dynamic loss scaling, op黑白名单 等策略来避免 因 FP16 动态范围较小而带来的模型最终精度损失。 Fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码 就可将自动混合精度应用到原有的分布式训练中进一步提升训练速度.

下文将通过一个简单例子介绍如如何通过 Fleet将实现混合精度的分布式训练, 另外给出我们使用 Fleet 进行同步训练加速的实践。

试验效果

环境: 4 机 32卡 V100-32GB

imagenet

单卡 batch size

速度 img/s

top1

`VGG16-FP32

32

4133

55.4%

`VGG16-AMP

32

7238

54.6%

imagenet

单卡 batch size

速度 img/s

top1

`Resnet50-FP32

128

8410

76.3%

`Resnet50-AMP

128

25591

76.0%

`Resnet50-FP32

256

OOM

OOM

`Resnet50-AMP

256

29440

76.0%

AMP 快速开始

这里以在单机多卡上训练Resent50 为简单例子介绍Fleet 中 AMP的用法.

自动混合精度原理

FP32 参数副本及更新
weight 副本

如上图所示, 在AMP 中, 模型参数 weight , 前向中间的结果activation, 反向的gradient 都以FP16 形式存储, 由此可以减少模型占用的显存空间,同时提高计算和通信速度,也就是使得训练吞吐更大,训练更快. Paddle框架会为每一个weight 维护一个FP32副本, 用于参数更新.

Loss scaling
weight 分布

如上图所示, 实际情况中模型训练中的某些变量, 比如grad (特别是 activationgrad), 可能会因小于 FP16的精度低而变成0;

另一方面在FP16 的表示范围的中有很大的一部分(从最大值往左) 却没有被利用到.

对gradient 做一个整体的放大, 能够更充分的利用FP16 的表示范围.

Fleet AMP 会在反向开始前对 loss 进行 up scaling, 并在执行任何梯度相关操作(e.g. gradient-clip, update) 之前对 gredient 进行 down scaling 恢复原来的大小.

scaling factor 的设置是 Lossing scaling 的关键, Fleet AMP 提供 Dynamic loss scaling (默认) 和 Constant loss scaling 两种scaling 策略:

  • Constant loss scaling: 设置 use_dynamic_loss_scaling = Falseinit_loss_scaling (float)

  • Dynamic loss scaling: scaling 中面临的问题是当scaling up 不足时, 仍会有部分较小变量会被表示成 0而损失精度; 当scaling up 过度时, 变量超过FP16表示范围出现 nan or inf, 同样造成精度损失. 此策略采用自动 gradient 值检测的方式:

    • 当连续incr_every_n_steps(int)个batch 中所有的gradient 都在FP16 的表示范围, 将scaling factor 增大incr_ratio(float)倍;

    • 当有连续decr_every_n_nan_or_inf(int)个batch 中gradient 里出现 nan / inf时, scaling factor 缩小 decr_ratio(float)倍.

    • 上述四个参数Fleet 提供的默认值可以满足绝大部分要求, 用户通常不需要修改.

如下图所示在 Dynamic loss scaling 中,框架在每一个 iteration 都会依据当前 gradients 是否出现 nan or inf 还有用户设置的 Dynamic loss scaling 参数来动态调整 loss scaling factor 的大小,将gradient 尽量保持在 FP16 的表示范围之内。

Dynamic loss scaling
OP 黑白名单

模型中的某些Operation (OP) 可能对精度较为敏感, 为了确保AMP 中精度无损, 可以通过OP 黑白名单对具体OP 操作的精度做指定.

  • 白名单: OP 操作在FP16精度下进行, input: 如果不是FP16 会被首先cast 成FP16后再输入OP. output: FP16

  • 黑名单: OP 操作在FP32精度下进行, input: 如果不是FP32 会被首先cast 成FP32后再输入OP. output: FP32

  • 灰名单: 所有不在黑或白名单里的OP. 仅当OP 所有 inputs 都是 FP16精度时, 操作才在FP16精度下进行, 否着以FP 32进行. input / output: 和原始输入中的最高精度相同

Fleet 已经预设了一个能够覆盖绝大多数模型OPs的黑白名单, 通常情况下用户并不需要修改, 但是如果任务对精度有特殊要求, 或者希望新增自定义 OP, 用户可以通过 paddle.distributed.fleet.DistributedStrategy.amp_configs 中的 custom_white_listcustom_black_list 进行指定. 同是, 用户还可以通过custom_black_varnames, 来具体指定Paddle program 某一个 var必须使用FP32精度.

我们将在文末的 appendix中 进一步介绍 Fleet 的黑白名单设置及其影响。

开始训练

添加依赖

首先我们要导入依赖和定义模型和 data loader, 这一步和Fleet 下其他任务基本一致.

import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化
paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据
model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
定义分布式及AMP 相关策略

如上文描述, 用户可以选择设置 Loss scalingOP黑白名单等的参数.

另外 Fleet 将AMP 实现为 meta optimizer, 用户需要指定其的 inner-optimizer. Fleet AMP支持所有 paddle optimziers 和 FLeet meta otpimizers 作为其 inner-optimizer.

dist_strategy.amp = True
dist_strategy.amp_configs = {
    "init_loss_scaling": 32768,
    "decr_every_n_nan_or_inf": 2,
    "incr_every_n_steps": 1000,
    "incr_ratio": 2.0,
    "use_dynamic_loss_scaling": True,
    "decr_ratio": 0.5,
    "custom_white_list": [],
    "custom_black_list": [],
}

optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

这一部分和Fleet 中其他任务基本相同:

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))

运行训练脚本

一行启动单机多卡分布式训练:

fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log example_amp.py

# worker_index: 0, step0 cost = 6.895311, speed: 12.192901
# worker_index: 0, step1 cost = 6.964077, speed: 412.116618
# worker_index: 0, step2 cost = 7.049311, speed: 433.850506
# worker_index: 0, step3 cost = 7.006689, speed: 358.400410
# worker_index: 0, step4 cost = 7.000206, speed: 398.210745
# worker_index: 0, step5 cost = 7.088611, speed: 462.322357
# worker_index: 0, step6 cost = 7.022367, speed: 425.185013

Fleet 黑白名单设置

上文简要介绍了Fleet 中黑白名单的 API 接口, 下文将进一步介绍 Fleet 中黑白名单的实现和可能对训练造成影响。 目前 Fleet 中 AMP 的默认黑白名单如下, 其他未列出的 op 都属于灰名单:

white_list = {
    'conv2d',
    'matmul',
    'mul',
}
black_list = {
    'exp',
    'square',
    'log',
    'mean',
    'sum',
    'cos_sim',
    'softmax',
    'softmax_with_cross_entropy',
    'sigmoid_cross_entropy_with_logits',
    'cross_entropy',
    'cross_entropy2',
}
黑白名单设置

白名单中只有卷积和乘法运算,这样的设置能够满足大部分的 CV 场景的模型加速(Vgg、ResNet), 因为卷积计算占据这些模型计算和内存访问开销的很大一部分, 其他 ops 的开销只占很小一部分。 对于 主要开销在 RNN 计算的 NLP 模型,目前的 AMP 实现提速并不是很明显。

黑名单中的 op 可以分为3 大类: * 对精度非常敏感的 op: softmaxcross_entropy 等。 * 输出相对于输入有更大动态范围的op(f(x) >> x):expsquare, log 等。 * reduce 类型的op: meansum 等。 所以,用户希望判断新的自定义op是否需要加入黑名单时,可以参考上述3个类型。

需要注意: 一些常用的 op 如 BatchNormpoolingrelu 属于灰名单,这意味着这些 op 的数据类型决定于之前的 op 的类型; 另外并行分布式计算使用 AMP之后,gradient-allreduce 是在FP16 中进行的。

自动化op 插入

在训练开始前,框架会根据黑白名单在前向和反向网络自动插入 cast op, 如: * 前向中插入 FP32toFP16 cast, 将 FP32 的layer parameter 副本 cast 成 FP16, 进行 FP16 conv 计算。 * 反向中插入 FP16toFP32 cast, 将等到的 FP16 gradient cast 成 FP32, 然后更新 FP32 的parameter 副本。

cast op 虽然会带来额外的开销, 但是在诸如 Vgg、ResNet 等主要由重复的 conv layer 串行的而成 CV 模型中, 只需要cast input 和 每一层的param,并不需要cast 模型的中间结果,这样 cast 操作带来的开销较少, 容易倍半精度计算带来的加速覆盖;但是如果模型的串行 layers 序列中存在较多的黑名单 op(e.g. conv --> log --> conv --> square --> conv), 这样模型的中间结果需要进行多次 FP32toFP16 和 FP16toFP32 cast, cast 开销将会急剧增大,从而抵消半精度带来的加速。

可能不适用 AMP 加速的情况
  • RNN 为主的 NLP 模型

  • 模型组网中有较多黑名单 op 的模型

  • 对数据精度敏感的任务(Adversarial Attacking in ML)

图像 Input Layout 格式

CV 模型训练时了达到最佳速度,不同场景下推荐使用不同图像 Layout

  • FP32:NCHW

  • 自动混合精度: NHWC

# when build dataloader
loader = model.load_imagenet_from_file("./ImageNet/train.txt",
                                        batch_size=args.batch_size,
                                        data_layout="NHWC")

# when build model
if data_format == "NHWC":
    img_shape = [None, 224, 224, 3]
else:
    img_shape = [None, 3, 224, 224]
image = fluid.data( name="feed_image", shape=img_shape, dtype="float32", lod_level=0)
conv = fluid.layers.conv2d(input=input, data_format= "NHWC")

推荐阅读:

如果需要对自动混合精度做定制化修改,或更深入理解AMP中原理和实现推荐阅读:

使用超大Batch进行训练

简介 + strategy列表

为了追求模型的性能不断提升,人们对更大规模的数据集、更深的网络层、更庞大的参数规模应运而生。但是随之而来的就是给模型训练带来了巨大的压力,因此分布式技术及定制化AI芯片应运而生。但在分布式训练中,经常会遇到显存或者内存不足的情况,通常是以下几点原因导致的:

  • 输入的数据过大,例如视频类训练数据。

  • 深度模型的参数过多或过大,所需的存储空间超出了内存/显存的大小。

  • AI芯片的内存有限。

为了能正常完成训练,我们通常只能使用较小的Batch Size以降低模型训练中的所需要的存储空间,这将导致很多模型无法通过提高训练时的Batch Size来提高模型的精度。为了解决这个问题,Fleet中提供了两种策略,使得模型可以使用超大Batch的方式完成训练:

  • Forward Recomputation Backpropagation(FRB): 通过清除正向计算过程中的中间计算结果,来降低训练过程中使用的存储空间,从而确保硬件有足够的内存做更大Batch Size的训练。

  • Gradient Merge: 在训练过程中,将连续多个Batch数据训练得到的梯度合并更新模型参数的策略。在该训练策略下,虽然从形式上看依然是小Batch规模的数据在训练,但是效果上可以达到多个小Batch数据合并成大Batch后训练的效果。

原理

Forward Recomputation Backpropagation

我们知道,深度学习网络的一次训练迭代包含三个步骤:

  • 前向计算: 运行前向算子(Operator) 来计算中间隐层(Variable)的值 。

  • 反向计算: 运行反向算子来计算参数(Parameter)的梯度。

  • 优化: 应用优化算法以更新参数值 。

在前向计算过程中,前向算子会计算出大量的中间结果,由于这些中间结果是训练数据和算子计算得到的,所以训练数据的Batch Size越大,中间结果占用的内存也就越大。飞桨核心框架会使用 Variable来存储这些隐层的中间结果。当模型层数加深时,其中间结果的数量可达成千上万个, 占据大量的内存。虽然飞桨核心框架的显存回收机制会及时清除无用的中间结果,以节省存储。 但是有些中间结果是反向计算过程中算子的输入,这些中间结果必须存储在内存中,直到相应的反向算子计算完毕。

对于大小固定的内存来说,如果用户希望使用大Batch Size的数据进行训练,则将导致单个中间结果占用内存增大,那么就需要减少中间结果的存储数量,FRB就是基于这种思想设计的。

FRB是将深度学习网络切分为k个部分(segments)。对每个segment而言:前向计算时,除了小部分必须存储在内存中的Variable外,其他中间结果都将被删除;在反向计算中,首先重新计算一遍前向算子,以获得中间结果,再运行反向算子。简而言之,FRB和普通的网络迭代相比,多计算了一遍前向算子。

我们把切分网络的变量叫做checkpoints。 那么问题来了,如何选择checkpoints呢?自从FRB方法提出以来,大量学者在研究这一关键问题。 我们知道深度学习网络通常是由一个个模块串联得到的,比如ResNet-50由16个block串联而成, Bert-Large由24个transformer串联而成,以两个子模块中间的变量作为切分点就是一个很好的选择。 对于非串联的网络(比如含有大量shortcut结构的网络),FRB也支持对其做切分, 只是可能多耗费一点内存(用于存储shortcut的Variable)。

Gradient Merge

与FRB相比,Gradient Merge并没有像FRB那样对内存的使用做出大刀阔斧般的改动,只是在训练流程上做了一些微调,达到模拟出大Batch Size训练效果的目的。具体来说,就是使用若干原有大小的Batch数据进行训练,即通过“前向+反向” 网络计算得到梯度。其间会有一部分显存/内存用于存放梯度,然后对每个Batch计算出的梯度进行叠加,在计算完所有Batch后,使用累加的梯度对模型进行参数更新,从而达到使用大Batch数据训练的效果。

GradientMerge 策略在使用方面也很简单,用户只需要定义将多少Batch的数据计算出的梯度叠加更新模型参数,便可以实现大Batch训练的目的。

操作实践

该章节中我们将基于BERT模型的实用样例,分别对这两个增大Batch的策略进行讲解。从整体来看,训练脚本的编写主要分为4个部分:

  • 添加训练脚本运行所必须的依赖包。

  • 定义分布式模式并初始化。

  • 加载模型及数据。

  • 定义训练策略和优化器,在这一步我们可以选择使用FRB或者Gradient Merge策略来增大BatchSize。

下面我们来分别介绍FRB和Gradient Merge两种策略所对应脚本的编写方法(bert_recompute.py 及 bert_gradient_merge.py)。

Forward Recomputation Backpropagation

添加依赖

首先我们需要添加训练中所用到的python模块,fleetx 可以用于加载我们为用户封装的接口如:加载模型及数据,模型训练等。paddle.distributed.fleet 中定义了丰富的分布式策略供用户使用。

# -*- coding: UTF-8 -*-
import paddle
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
定义分布式模式并初始化

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模型,下面代码中的is_collective=True表示采用集合通信的GPU分布式模式训练模型。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据

用户可以通过X.applications接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。下面的例子中,我们使用了recompute对Bert_large模型所支持的最大Batch Size(130)来进行训练。

与此同时,用户可以使用我们的`Downloader`接口下载预先保存的Wiki数据集。

model = X.applications.BertLarge()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/small_datasets/yaml_example/wiki_cn.yaml',
    local_path='./data')
data_loader = model.get_train_dataloader(
    local_path,
    max_seq_len=512,
    batch_size=130,
)
定义Recompute Strategy 及 Optimizer

接下来我们就可以定义分布式训练中所应用到的策略了。下面的例子中,为了使用Recompute策略,我们将dist_strategy.recompute设置为True 并设置我们事先定义好的checkpoints。

接下来用户需要定义训练中更新模型所用到的优化器,并使用fleet.distributed_optimizer接口将优化器转换为分布式模式。

最后运行optimizer.minimize(model.loss) 将反向计算的算子插入训练网络,我们就可以开始训练了。

dist_strategy = fleet.DistributedStrategy()
# 使用Recompute,并设置checkpoints
dist_strategy.recompute = True
dist_strategy.recompute_configs = {"checkpoints": model.checkpoints}

optimizer = fluid.optimizer.Adam(learning_rate=configs.lr)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

在 FleetX 中,我们为用户提供了X.MultiGPUTrainer 接口,用于GPU分布式训练。其中modeldata_loader 分别为第二步中加载的模型及数据。start_step 表示开始打印训练log的步数,若用户想复现我们的模型训练速度数据建议设置成10或者更大的数;若用户想查看模型的收敛情况,则可设置成0。

trainer = X.MultiGPUTrainer()
trainer.fit(model, data_loader, epoch=10)
运行训练脚本

完成脚本的编写后我们就可以使用以下命令训练分布式模型:

fleetrun --gpus 0,1,2,3,4,5,6,7 bert_recompute.py
效果测试

我们在BERT模型上对recompute的效果进行了测试,使用Recompute后Batch size可以扩大9倍多。与混合精度一起使用时,Batch_size可以进一步扩大。其中,速度记录的是分布式训练任务每秒可以训练的样本数。

Model

Baseline

Recompute

Recompute + mixed precision

Batch size

14

130

145

speed

69.92 sents/s

45.76 sents/s

75.84 sents/s

Gradient Merge

下面,我们介绍如何使用 Gradient Merge 来扩大BERT模型分布式训练中的 Batch Size(假设脚本名称为bert_gradient_merge.py):

与 Forward Recompute Backpropagation 相同,我们首先要添加依赖,定义分布式模式并加载模型及数据。

添加依赖
# -*- coding: UTF-8 -*-
import paddle
import fleetx as X
import paddle.fluid
import paddle.distributed.fleet as fleet
定义分布式模式并初始化
加载模型及数据
model = X.applications.Bert_large()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/small_datasets/yaml_example/wiki_cn.yaml',
    local_path='./data')
data_loader = model.(
    local_path,
    max_seq_len=512,
    batch_size=13,
)
定义Gradient Merge Strategy 及 Optimizer

在上面的代码中,我们定义了Batch Size为13,在这一步中,通过设置k_steps,使用4个Batch Size来模拟一个大Batch的训练,从而达到了Batch size为52的训练效果。

gradient_merge_configs中,avg选项用于控制梯度累计的形式:当被设置为 True 时,会对每次的梯度求和并做平均;反之将直接对梯度求和,并对参数进行更新。

dist_strategy = fleet.DistributedStrategy()
# 使用Gradient merge策略并设置相关参数
dist_strategy.gradient_merge = True
dist_strategy.gradient_merge_configs = {"k_steps": 4, "avg": True}
optimizer = fluid.optimizer.Adam(learning_rate=configs.lr)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

Gradient Merge 的训练代码与 Recompute 策略相同,用户使用两行代码即可开始训练:

trainer = X.MultiGPUTrainer()
trainer.fit(model, data_loader, start_step=10)
运行训练脚本
fleetrun --gpus 0,1,2,3,4,5,6,7 bert_gradient_merge.py

使用LARS / LAMB 优化分布式超大batch 训练

简介

在数据并行分布式训练场景中, 常使用增加GPU数量的方式来加速训练. 为了保证GPU的算力得到充分利用, 每张GPU卡上的batch size都需要足够大. 因此在增加GPU 数量同时, 训练的全局batch size 也会变大.

但越大的全局batch size 会带来训练的收敛问题[1] [2]:

  • 模型最终精度损失

  • 收敛速度变慢, 需要更多的epoch 才能收敛

LARS[3] 和 LAMB[4] 两个优化策略常用来解决上述超大batch 训练中的收敛问题.

Paddle 实现了这两种优化策略,paddle.distributed.fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码 就可将策略加入到原有的训练中。 通过这两个优化策略, 我们在超大batch 场景中实现了更快的收敛速度和无损的精度, 结合Fleet 中其他的策略(e.g. AMP) 可以极大缩短的训练整体的time2train.

试验效果

resnet50 imagenet

Global batch size

epoch

top1

[Goyal et al]

8k

90

76.3%

LARS Paper

32k

90

72.3%

[fleet: lars + amp]

16k

60

76.2%

[fleet: lars + amp]

32k

62

75.9%

LARS

我们以在单机多卡上Resent50 训练为简单例子介绍fleet 中 LARS的用法。

import os
import fleetx as X
import paddle
paddle.enable_staic()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模型,下面代码中的is_collective=True表示采用集合通信的GPU分布式模式训练模型。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)

用户可以通过X.applications接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)

LARS 优化算法的公式如下:

\[local\_learning\_rate = learning\_rate * lars\_coeff * \frac{||param||}{||gradient|| + lars\_weight\_decay * ||param||}\]
\[\begin{split}velocity = mu * velocity + local\_learning\_rate * (gradient + lars\_weight\_decay * param + epsilon) \\\end{split}\]
\[\begin{split}param = param - velocity \\\end{split}\]

可以看到LARS 其实是在 带weight decaymomentum 优化器的基础上加入了local learning rate 的逻辑, 对每一层的learning rate 进行了放缩. fleet 将 LARS实现为一个 fleet meta optimizer, 在使用时需要设置一下几点:

  1. LARS meta optimizer 的 inner optimizer 必须为 momentum, 并在 momentum 中定义 mulr 参数.

  2. 在 fleet dist_strategy 定义LARS 特有的 lars_coeff 参数和 lars_weight_decay 参数.

    • LARS 已经将 weight decay 包含进公式中, 用户不需要再在 optimizer中设置 regularization.

    • fleet 中还提供 lars_weight_decay 过滤策略, 可以通过在exclude_from_weight_decay 参数加入对应layer 的 name string, 让这一 layer 的参数不进行 lars weight decay. (通常我们将``BN`` 参数 和 FC_bias 从lars weight decay 中过滤)

dist_strategy = fleet.DistributedStrategy()

dist_strategy.lars = True
dist_strategy.lars_configs = {
                    "lars_coeff": 0.001,
                    "lars_weight_decay": 0.0005,
                    "exclude_from_weight_decay": ['batch_norm', '.b_0']
                }

optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

这一部分和fleet 中其他任务基本相同:

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))

运行训练脚本

完成上述脚本的编写后,我们就可以使用以下命令一行启动单机多卡分布式训练:

fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log example_lars.py

LAMB

我们以在单机多卡上Bert 训练为简单例子介绍fleet 中LAMB 的用法.

import os
import fleetx as X
import paddle
paddle.enable_staic()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet

这一步和上文中的LARS 一致。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)

这一步和上文中的LARS 一致。

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)

LAMB 优化算法的公式如下:

\[\begin{split}m_t = \beta_1 m_{t - 1}+ (1 - \beta_1)g_t \\\end{split}\]
\[\begin{split}v_t = \beta_2 v_{t - 1} + (1 - \beta_2)g_t^2 \\\end{split}\]
\[\begin{split}r_t = \frac{m_t}{\sqrt{v_t}+\epsilon} \\\end{split}\]
\[\begin{split}w_t = w_{t-1} -\eta_t \frac{\left \| w_{t-1}\right \|}{\left \| r_t + \lambda w_{t-1}\right \|} (r_t + \lambda w_{t-1}) \\\end{split}\]

和LARS 类似, LAMB 也是在内层优化器的基础上, 套了一个local learning rate 的逻辑, 对每一层的learning rate 进行了放缩. fleet 将 LAMB实现为一个 fleet meta optimizer, 在使用时需要设置一下几点:

  1. LAMB meta optimizer 的 inner optimizer 必须为 adam, 并在 adam 中定义 学习率lr, 一阶 moment 的指数衰减率beta1 和二阶moment 的指数衰减率beta2 参数.

  2. 在 fleet dist_strategy 定义LAMB 特有的 lamb_weight_decay 参数.

    • LAMB 已经将 weight decay 包含进公式中, 用户不需要再在 optimizer中设置 regularization.

    • fleet 中还提供 lamb_weight_decay 过滤策略, 可以通过在exclude_from_weight_decay 参数加入对应layer 的 name string, 让这一 layer 的参数不进行 lars weight decay. (通常我们将``LN`` 从lamb weight decay 中过滤)

dist_strategy = fleet.DistributedStrategy()

dist_strategy.lamb = True
dist_strategy.lamb_configs = {
                    'lamb_weight_decay': 0.01,
                    'exclude_from_weight_decay': ['layer_norm'],
                }

optimizer = paddle.optimizer.Adam(learning_rate=0.01, beta1=0.9, beta2=0.999)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

这一部分和fleet 中其他任务基本相同:

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))

运行训练脚本

完成上述脚本的编写后,我们就可以使用以下命令一行启动单机多卡分布式训练:

fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log resnet50_lamb.py

使用Fleet进行异构参数服务器训练

异构参数服务器目前仅支持在静态图下运行

什么是异构参数服务器?

在开始使用异构参数服务器前,您需要先了解参数服务器的基本知识。我们先进行简单回顾:

参数服务器的应用领域以及解决的问题

参数服务器集中应用在NLP推荐 以及 搜索领域,其主要针对以下两个问题:

  1. 大数据

    原始数据集庞大,动辄几百G的数据量,单机训练速度难以承受,需要依赖数据并行加速训练,以此解决大数据的问题。

  2. 大参数

    在上述场景中,数据量大的同时,伴随着特征的稀疏性,开发者们通常使用Embedding技术来将业务场景中的高维稀疏特征向量转化为低维的稠密特征向量。

    在工业场景中,该Embedding参数的维度往往是亿级,占用的存储空间在上百GB,单机内存以及单卡显存无法承受完整的参数保存。但我们可以观察到,使用每个batch的数据进行训练时,并不会用到全部的稀疏参数,仅需与样本有关的部分参数,内存或显存可以承受。因此在这种场景下,开发者们通常使用参数服务器模式,将大参数分片放到各个Server上。Worker训练模型时,仅请求当前batch数据需要用到的参数,以此解决大参数的问题。

传统参数服务器的局限

当前参数服务器的Worker节点,通常使用CPU或GPU机器完成模型训练中的前向与反向部分。

Worker使用的设备确定,其硬件算力的配比也随之固定。固定的算力配比,在工业应用中,存在着以下问题:

  1. GPU机器利用率较低

    若训练的模型不复杂,如推荐领域常用的DeepFMLR,模型计算耗时并不高,而数据读取(IO)的性能决定了网络训练的速度,也同时限制了GPU机器的利用率,简单的将IO任务交给GPU机器上的CPU,或使用Nvidia安倍架构+DALI处理数据不能解决该问题。

  2. CPU机器算力有瓶颈

    CPU机器通常核心数较多,并且机器价格也更便宜,可以充分利用CPU多核的优势,在简单模型上极大的提升数据吞吐,整体训练达到较好的性能。但是,随着深度学习模型的日渐复杂,在一些计算能力要求高的模型中,比如BERT,计算能力严重不足,模型计算耗时极高。

  3. 新型算力接入成本较大

    随着AI芯片发展日新月异,各种高算力低成本的芯片已进入工业实用化阶段。但是开发者们使用AI芯片的门槛依然较高,例如软件栈的改变,集群的迁移等,新硬件的接入成本较高。

异构参数服务器介绍

那么,可不可以动态调整机器配比?同时解决IO瓶颈以及算力瓶颈,并且快速支持新硬件的接入呢?

PaddlePaddle基于工业实践,创新性的提出了异构参数服务器,支持不同算力的芯片混合异构训练,如CPU、v100,p40,昆仑芯片,对不同算力的芯片高效利用,使得训练任务对设备不敏感,获得更高的加速效果。

heter_overview
异构参数服务器基本原理

一个深度学习模型的训练过程可以拆分为三步:1、前向计算Loss;2、反向计算梯度;3、利用梯度更新参数。

参数服务器模式下,前向及反向步骤在Worker端(也称为Trainer)完成,参数更新在Server端完成。

异构参数服务器模式中,我们进一步拆分前向及反向,可以将embedding查表,输入数据的reshape等IO密集型的OP放置于CPU-Trainer,将fc,attention等计算密集型的OP放置于Heter-Trainer

CPU-TrainerHeter-Trainer之间会进行通信,交换网络运行所需要的上下文参数,两者协同完成前向和反向步骤,并将梯度发给Server,完成参数的更新。

heter_example
异构参数服务器底层原理
  • 单机训练的运行原理图

single_program
  • 传统参数服务器的运行原理图

async_program
  • 异构参数服务器的运行原理图

heter_program
异构参数服务器使用方法

下面介绍异构参数服务器的使用方法,推荐先在正常参数服务器模式下运行成功,再开始调试异构参数服务器模式。下面介绍的使用方法,均为在正常参数服务器模式基础上的增量变动,请知晓。

以下示例的完整代码位于FleetX/example/heter_parameter_server/demo.py

  • 1、设置运行在异构设备上的组网

深度学习组网,通常可以拆解为两部分:1、IO密集型组网;2、计算密集型组网,如下面的DNN组网所示:

# --------- IO 密集型网络 ---------
# 数据输入 & embedding 查表 & sequence_pool 等操作
input_data = paddle.data(name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.data(name="label", shape=[None, 1], dtype="int64")
embedding = paddle.static.nn.embedding(input_data, is_sparse=True, size=[1000,128])

# --------- 计算 密集型网络 ---------
# fc & cnn & rnn & attention 等网络结构
fc1 = paddle.static.nn.fc(embedding, size=1024, act="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, act="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, act="relu")
predict = paddle.static.nn.fc(fc3, size=2, act="softmax")
cost = paddle.nn.functional.cross_entropy(input=predict, label=input_label)

我们可以使用fluid.device_guard()API划分模型中各个OP的运行设备,上述组网可以改变如下:

with fluid.device_guard("cpu"):
    input_data = paddle.data(name="sparse_input", shape=[None, 1], dtype="int64")
    input_label = paddle.data(name="label", shape=[None, 1], dtype="int64")
    label = paddle.cast(input_label, dtype="float32")
    embedding = paddle.static.nn.embedding(input_data, is_sparse=True, size=[1000,128])


with fluid.device_guard("gpu"):
    fc1 = paddle.static.nn.fc(embedding, size=1024, act="relu")
    fc2 = paddle.static.nn.fc(fc1, size=512, act="relu")
    fc3 = paddle.static.nn.fc(fc2, size=256, act="relu")
    predict = paddle.static.nn.fc(fc3, size=2, act="softmax")
    label = paddle.cast(label, dtype="int64")
    cost = paddle.nn.functional.cross_entropy(input=predict, label=label)

这样划分组网的作用是:

  1. IO密集型的OP适合在CPU设备上运行,使数据输入输出不再成为模型训练的瓶颈。

  2. 计算密集型OP放在GPU等AI芯片设备上,可以充分利用算力,加速模型训练。

与此同时,Paddle-异构参数服务器,支持并且建议您在训练时,CPU-Trainer的设备数量 >> Heter-Trainer的设备数量,可以充分增大数据的IO效率,同时充分利用异构设备的算力。

  • 2、异构参数服务器Strategy配置

使用fleet api启动异构参数服务器,需要配置DistributedStrategy,使用上述组网生成的cost,参数服务器模式下,我们使用如下代码添加Optimizer

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True

optimizer = fluid.optimizer.Adam(args.learning_rate)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)

而在异构参数服务器模式下,仅需额外指定异构设备使用的device类型,其余保持不变,代码如下:

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
# ---- 新增strategy配置, 指定异构设备的device类型 ----
strategy.a_sync_configs = {"heter_worker_device_guard": 'gpu'}

optimizer = paddle.optimizer.Adam(args.learning_rate)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
  • 3、异构参数服务器的启动环境变量配置

启动异构参数服务,需要在参数服务器的基础上,为异构设备指定:

  1. 设备IP及通信端口: PADDLE_HETER_TRAINER_IP_PORT_LIST=ip:port,ip:port,...

  2. 训练角色环境变量: TRAINING_ROLE=HETER_TRAINER

例如:

export PADDLE_HETER_TRAINER_IP_PORT_LIST='ip:port,ip:port'
export TRAINING_ROLE=HETER_TRAINER

当执行fleet初始化代码时:

fleet.init()

# 若进程检测到环境变量中配置了 PADDLE_HETER_TRAINER_IP_PORT_LIST,则会进入异构参数服务器模式,进行相应的计算图切分及初始化。

# 若进程检测到环境变量中 TRAINING_ROLE 存在,并且等于 HETER_TRAINER 时,则该进程扮演异构计算设备的角色

# 异构设备的设备类型由上文中提到的 strategy.a_sync_configs = {"heter_worker_device_guard": 'gpu'} 指定。

我们提供了一键启动的fleetrun功能,可以便利的启动异构参数服务器训练,将在下文介绍。

使用fleetrun启动异构参数服务器训练

fleetrunpaddle2.0rc版本以后新加入的分布式训练启动工具,可以参考fleetrun,下面介绍一下如何使用fleetrun启动异构参数服务器。

当训练代码ready以后,假如训练启动入口是train.py,则可按照以下的方式启动异构参数服务器训练:

方法一,针对单机模拟分布式训练,使用自动分配的ip和port

fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 train.py

方法二,针对单机,或自定义的多机训练,使用指定的ip及端口

fleetrun --servers=ip:port,ip:port --workers=ip:port,ip:port --heter_workers=ip:port,ip:port train.py

方法三,针对PaddleCloud平台的custom-framework模式,指定任务的启动命令

PaddleCloud是百度内部的深度学习任务平台,提供了便捷的提交流程以及任务管理功能,该平台完整功能将适时向广大开发者开放,更多信息,可以查阅PaddleCloud

# heter_worker数量会根据配置的GPU设备数量自动调整
# 添加该配置是为了指定fleetrun运行在异构参数服务器模式下
fleetrun --heter_worker_num=2 train.py
异构参数服务器使用示例

示例代码位于FleetX/example/heter_parameter_server/

  • 数据下载

bash sh download_data.sh

执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于./train_data_full/,全量测试数据放置于./test_data_full/,用于快速验证的训练数据与测试数据放置于./train_data/./test_data/

至此,我们已完成数据准备的全部工作。

  • 启动训练

# ps-cpu
fleetrun --server_num=2 --worker_num=2 heter_train.py

# ps-heter
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 heter_train.py

1. 飞桨底层分布式API的使用案例

1.1 简介

卷积神经网络主要包含两种类型的模型层:卷积层和全连接层。卷积层包含约5%的模型参数量和约90-95%的模型计算量;全连接层包含约95%的模型参数量和约5-10%的模型计算量。通常来讲,卷积层适合采用数据并行,因为卷积层模型参数的通信开销更小;而全连接层适合采用模型并行,因为相比于全连接层的模型参数,全连接层的输出张量的通信开销更小。因此,本示例中,AlexNet模型的卷积层采用数据并行,全连接层采用模型并行。

本文档以AlexNet网络为例介绍如何使用飞桨的底层集合通信API实现模型并行训练。

1.2 模型并行原理和实现

1.2.1 版本要求

  • paddlepaddle 2.0-rc-gpu版本及以上

1.2.2 分布式API使用案例

本节,我们介绍如何实现全连接层的模型并行。

首先,汇聚各块GPU卡全连接层的输入数据,得到全局输入数据;并用全局数据和全连接层计算,得到各块GPU卡的全连接层输出,如下图所示。

step1

接着,汇聚各块GPU卡全连接层的输出数据,并抽取本块GPU的样本数据的全连接层输出,如下图所示。

step2

1.2.3 动态图实现

上述过程描述地完整前向计算过程实现代码如下:

# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn

# 定义模型并行的全连接网络,需要继承自nn.Layer
class ModelParallelLinear(nn.Layer):
    def __init__(self,
                 in_dim,
                 rank_num,
                 rank_id,
                 class_num):
        super(ModelParallelLinear, self).__init__()
        if class_num % rank_num:
            raise ValueError("Number of classes must be divisible "
                             "the number of ranks.")
        shard_dims = class_num // rank_num
        self.linear = nn.Linear(in_dim, shard_dims)
        self.rank_num = rank_num
        self.rank_id = rank_id
        for parameter in self.linear.parameters():
            parameter.is_distributed = True

    def forward(self, x):
        global_x_list = []
        paddle.distributed.all_gather(global_x_list, x)
        global_x = paddle.concat(global_x_list, axis=0)
        out = self.linear(global_x)
        global_out_list = []
        paddle.distributed.all_gather(global_out_list, out)
        all_outs = paddle.concat(global_out_list, axis=1)
        out = paddle.split(all_outs, self.rank_num)[self.rank_id]
        return out

备注:因为每块GPU卡保存部分全连接层参数,上面的例子中设置模型参数的is_distributed属性为True,用于避免在反向阶段对相应的模型参数做基于all_reduce的同步操作。

完整地训练代码实现如下:

# -*- coding: UTF-8 -*-
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.fluid.dygraph import Conv2D
#分布式step 1: 导入paddle.distributed.fleet包
from paddle.distributed import fleet
from model_parallel_linear import ModelParallelLinear

# 定义全连接网络,需继承自nn.Layer
class SimpleModelParallelClassifierNet(nn.Layer):
    def __init__(self,
                 class_num,
                 rank_num,
                 rank_id):
        super(SimpleModelParallelClassifierNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2)
        self.max_pool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.conv2 = nn.Conv2d(64, 192, kernel_size=5, padding=2)
        self.max_pool2 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.conv3 = nn.Conv2d(192, 384, kernel_size=3)
        self.conv4 = nn.Conv2d(384, 256, kernel_size=3)
        self.conv5 = nn.Conv2d(256, 256, kernel_size=3)
        self.max_pool5 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.model_parallel_linear1 = ModelParallelLinear(2304,
                                                          rank_num,
                                                          rank_id,
                                                          4096)
        self.model_parallel_linear2 = ModelParallelLinear(4096,
                                                          rank_num,
                                                          rank_id,
                                                          4096)
        self.model_parallel_linear3 = ModelParallelLinear(4096,
                                                          rank_num,
                                                          rank_id,
                                                          class_num)
        self.droupout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.max_pool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.max_pool2(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.conv4(x)
        x = self.relu(x)
        x = self.conv5(x)
        x = self.relu(x)
        x = self.max_pool5(x)
        x = F.dropout(x, 0.5)
        x = paddle.reshape(x, [x.shape[0], -1])
        x = self.model_parallel_linear1(x)
        x = F.dropout(x, 0.5)
        x = self.model_parallel_linear2(x)
        out = self.model_parallel_linear3(x)
        return out

# 分布式step 2: 初始化fleet
fleet.init(is_collective=True)

# 1. 定义网络对象,损失函数和优化器
layer = SimpleModelParallelClassifierNet(class_num=1000,
                                         rank_num=fleet.worker_num(),
                                         rank_id=fleet.worker_index())
adam = paddle.optimizer.Adam(learning_rate=0.001,
                             parameters=layer.parameters())

# 分布式step 3: 通过fleet获取分布式优化器和分布式模型
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)


for step in range(20):
    # 2. 执行前向网络
    image = paddle.randn([1, 3, 224, 224], 'float32')
    label = paddle.randint(low=0, high=10, shape=[1,1])
    output = dp_layer(image)
    loss = F.softmax_with_cross_entropy(output, label)
    loss = paddle.mean(loss)

    print("step:{}\tloss:{}".format(step, loss.numpy()))

    # 3. 执行反向计算和参数更新
    # 分布式step 4: 在执行反向(backward函数)前后进行损失缩放和反向梯度的聚合
    loss.backward()

    adam.step()
    adam.clear_grad()

将上述代码保存为train.py,假设要运行2卡任务,那么只需要在命令行执行下面的命令:

fleetrun --gpus=0,1 tain.py

使用流水线并行进行训练

简介

随着多种神经网络加速设备和专用神经网络计算芯片的出现,采用异构设备训练模型逐渐成为一种趋势。以CPU和GPU异构训练为例,CPU设备的并行计算能力较弱,但其具备数百GB到数TB的内存容量;与之不同,GPU设备具有强大的并行计算能力,但其显存容量仅为数十GB。同时,网络模型的不同层对计算能力和存储容量的需求差异显著。例如,神经网络的embedding层可以理解为查表操作,对计算能力的要求较低,但对存储容量的需求较大;与之相反,卷积类操作通常对存储容量的需求较低,但对计算能力的需求较高。因此,若能够根据异构设备和模型层间的不同特性,对模型的不同层使用不同的计算设备,可以优化模型训练过程。

原理

流水线并行分布式技术与数据并行不同,通过将模型切分到多个计算节点,并采用流水线执行的方式,实现模型的并行训练。以下图为例,模型被切分为三个部分,并分别放置到不同的计算设备(第1层放置到设备0,第2、3层被放置到设备1,第四层被放置到设备2);设备间通过通信的方式来交换数据。

pipeline

具体地讲,前向计算过程中,输入数据首先在设备0中通过第1层的计算得到中间结果,并将其传输到设备1,然后由设备1计算第2层和第3层,经过最后一层的计算后得到最终的前向计算结果;反向传播过程中,第四层使用前向计算结果得到相应的梯度数据,并由设备2传输到设备1,一次经过第3层和第二层,将结果传至设备0,经过第1层的计算完成所有的反向计算。最后,各个设备上的网络层会更新参数信息。

如下图,为流水线并行中的时序图。简单的流水线并行方式下,任一时刻只有单个计算设备处于激活状态,其它计算设备则处于空闲状态,因此设备利用率和计算效率较差。

pipeline_timeline1

为了优化流水线并行的性能,我们可以将mini-batch切分成若干更小粒度的micro-batch,提升流水线并行的并发度,达到提升设备利用率和计算效率的目的。如下图所示,一个mini-batch被切分为4个micro-batch;前向阶段,每个设备依次计算单个micro-batch的结果;这种减小mini-batch的方式减少了每个设备完成一次计算的时间,进而增加了设备间的并发度。

pipeline_timeline2

下面我们将通过例子为您讲解如何使用pipeline策略在两张GPU上训练模型。

使用样例

导入依赖

# -*- coding: UTF-8 -*-
import os
import argparse
import paddle
import time
import math
import numpy as np

import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
paddle.enable_static()

定义模型

在使用流水线并行的训练策略时,我们通过device_guard接口将不同的计算层放置在不同的设备上。

对于CPU设备,在使用device_guard时只需要指定设备类型,即device_guard("cpu");对于GPU设备,除了指定设备类型外,还需要指定设备的id,如device_guard("gpu:0")

在下面的例子中,我们将数据层及embedding层放置在CPU中, 并将fc及loss放置在第0号GPU卡上。

# 模型组网
def build_network():
    # Step1: 使用device_gurad指定相应层的计算设备
    with paddle.fluid.device_guard("cpu"):
        data = paddle.data(name='sequence', shape=[1], dtype='int64')
        data_loader = paddle.io.DataLoader.from_generator(
            feed_list=[data],
            capacity=64,
            use_double_buffer=True,
            iterable=False)
        emb = nn.embedding(input=data, size=[128, 64])
    with paddle.fluid.device_guard("gpu:0"):
        fc = nn.fc(emb, size=10)
        loss = paddle.mean(fc)
    return data_loader, loss

定义数据集及梯度更新策略

定义完模型后,我们可以继续定义训练所需要的数据,以及训练中所用到的更新策略。

通过设定dist_strategy.pipeline 为True,将流水线并行的策略激活。

fleet.init(is_collective=True)

data_loader, loss = build_network()

dist_strategy = paddle.distributed.fleet.DistributedStrategy()
dist_strategy.pipeline = True
optimizer = paddle.fluid.optimizer.SGDOptimizer(learning_rate=0.1)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(loss)

def train_reader():
    for _ in range(100):
        data = np.random.random(size=[32, 1]).astype("int64")
        yield data

开始训练

place = paddle.CPUPlace()
exe = paddle.static.Executor(place)

data_loader.set_sample_generator(train_reader, batch_size=2)

exe.run(paddle.static.default_startup_program())

data_loader.start()
exe.train_from_dataset(paddle.static.default_main_program())

低频通信参数服务器训练算法

简介

众所周知,在同步/异步参数服务器分布式训练中Trainer每训练完一个周期,都会将梯度上传至Server,等待Server分发最新的参数后才开始新一轮的训练。在这种训练方式中,节点间的通信会消耗大量的时间成本,进而影响训练的效率。

为了降低节点见通信对训练速度的影响,Fleet提供了一种更高效的参数更新策略:GeoSGD

原理

heter_overview

在GeoSGD更新策略中,Trainer的参数更新也是在全异步的条件下进行的。但与异步参数服务器有以下不同:

  • 与普通的参数服务器不同,在GEO策略中,每个Trainer负责在本地维护自己的参数更新,在训练一定数量的步数后将本轮训练出的参数与上一轮结束后的参数做差。并除以Trainer的个数,将结果上传至Server。Server则负责为每个Trainer计算其参数与全局参数的diff。

  • GEO更新策略会在训练过程中启动多个进程,负责参数更新及节点通信。在Trainer与Server的整个交互过程中,主进程会保持模型的训练,由子进程负责与server进行交互,在拿到与全局参数的diff后将其更新至主进程。

GEO策略通过模型训练与节点通信同步进行的方式,在保证模型效果的前提下,大大提升了训练的速度。经过在SGD优化器上的测试,GEO策略相比异步参数服务器,训练速度提高了1倍。

接下来我们将通过例子为您讲解GEO在Fleet中是如何应用的。

在开始之前我们首先需要下载训练中所需要的数据:

# 下载并解压数据,训练数据讲保存至名为 raw_data 的文件夹
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz

操作实践

添加依赖

首先我们需要添加训练中所用到的python模块,fleetx 可以用于加载我们为用户封装的接口如:加载模型及数据,模型训练等。paddle.distributed.fleet 中定义了丰富的分布式策略供用户使用。

import paddle
import fleetx as X
import paddle.fluid as fluid
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

定义分布式模式并初始化

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模式,定义GEO策略使用的初始化接口与同步/异步参数服务器相同,都是init()默认的模式。

paddle.enable_static()
configs = X.parse_train_configs()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)

加载模型及数据

在这个例子中我们使用了与同步/异步参数服务器相同的CTR-DNN模型。用X.applications接口加载模型,并加载定制化的数据。

model = X.applications.MultiSlotCTR()
loader = model.load_multislot_from_file('./train_data')

定义同步训练 Strategy 及 Optimizer

在Fleet API中,用户可以使用fleet.DistributedStrategy()接口定义自己想要使用的分布式策略。

想要使用GEO策略,用户首先需要打开异步参数服务器开关,即设置a_sync为 True。

然后用户需要通过dist_strategy.a_sync_configs设置Trainer上传参数的频率,下面的代码中我们设置Trainer每训练10000个Batch后与Server进行交互。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {"k_steps": 10000}

optimizer = fluid.optimizer.SGD(learning_rate=0.0001)

optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

开始训练

GEO策略的训练代码沿用了参数服务器分布式训练的形式。

对于Server节点,首先用init_server()接口对其进行初始化,然后启动服务并开始监听由训练节点传来的参数变化值。

同样对于训练节点,用init_worker()接口进行初始化后x,开始执行训练任务。运行X.Trainer.fit接口开始训练。

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    fleet.init_worker()
    trainer = X.Trainer(fluid.CPUPlace())
    trainer.fit(model, loader, epoch=10)

运行训练脚本

定义完训练脚本后,我们就可以用fleetrun指令运行分布式任务了。其中server_num, worker_num分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。

fleetrun --server_num=1 --worker_num=2 ctr_app.py

服务型弹性蒸馏

简介

蒸馏训练

在很多场景下,模型越大,层数越多,模型效果就越好。但受限于推理速度,显存资源等要求,大模型通常无法直接部署,需要对模型进行压缩。知识蒸馏是一种经典的模型压缩技术,由《Distilling the Knowledge in a Neural Network》 在2015年第一次提出,是将知识从一个复杂模型(Teacher)迁移到另一个轻量级模型(Student)上的方式来实现模型压缩。

如下图所示,训练步骤可以分为两步:

  • 训练好一个Teacher模型。

  • 使用Teacher模型的知识来训练Student模型。 所谓Teacher模型的知识是指Teacher模型的推理结果,我们称之为soft label,这个soft label将作为Student网络的训练目标,Student的推理结果需要尽可能接近Teacher的推理结果。

distillation

服务型蒸馏训练

服务型蒸和其他常见蒸馏方式的对比:

  • 离线蒸馏训练: 先使用Teacher模型做推理并将结果保存在磁盘中,然后Student模型使用磁盘中保存的样本和Teacher模型的推理结果作为数据集进行训练。这种训练方式一般需要数据增强,而且需要占用巨大的磁盘空间,因此应用环境受到了一定的限制。

  • 常规蒸馏训练: 常规蒸馏训练是指将 Teacher 模型和 Student 模型放入同一网络中,固定 Teacher 模型参数只做前向,Student 模型则正常做反向传播训练。这也是目前主流的蒸馏训练方式, 单这种方式下 Student 模型的训练完全依赖 Teacher 模型,Student 模型要等 Teacher 模型输出一个 batch 的推理结果才可以训练,而 teacher 模型也要等 Student 训练完一个 batch,才能开始下一个 batch 的推理,对整体的训练速度有一定的影响。

  • 服务型蒸馏训练: 是基于 Elastic Deep Learning 提出的一种训练方案。它将Teacher模型和Student模型解耦,Teacher模型被部署为线上推理服务,Student模型则以客户端的身份通过互联网实时发送样本到Teacher模型获取推理结果进行训练

服务型蒸馏训练收益

  • 节约显存资源: 由于Student模型和Teacher模型的解耦,所以服务型蒸馏训练可以使用异构的资源,也就是把Student模型和Teacher模型的部署到不同的设备上。原先受限于显存大小而难以部署到单个GPU卡上的蒸馏网络可以通过该方式部署到不同卡上。

  • 提升训练速度:由于节约了显存资源,这样就可以使Student模型能够训练更大的batch size;同时由于Student模型和Teacher模型是异构流水线,Student模型不用等Teacher模型推理结束后再训练。

  • 提高训练资源利用率:我们可以将Teacher模型部署到线上的弹性预估卡集群,利用线上预估卡闲时的算力资源提升蒸馏任务中Teacher模型侧的吞吐量。同时由于Teacher模型可以弹性调度,不用担心高峰时线上实例被抢占造成的任务失败。还可以将Teacher模型部署到集群碎片资源,或者如k40等使用率较低的资源上,充分利用集群的空闲、碎片资源。

  • 提升训练效率:用户可以根据Teacher和Student的吞吐性能灵活设置Teacher和Student的比例,也就是说多个老师可以教多个学生,而不是只能保持1比1的家教模式,最大限度地提高训练的产出。

EDL 服务型弹性蒸馏效果

ResNet50_vd模型, ImageNet 数据集

服务型弹性蒸馏

DistillReader

服务型弹性蒸馏的核心是将Teacher模型部署成了服务端,而Student模型成了客户端。 将Teacher模型被部署为在线可容错弹性服务, 在Student模型一侧则通过 DistillReader 来封装Student模型与Teacher模型之间的通信,访问Teacher服务。

DistillReader

DistillReader 产生可供Student模型训练的数据reader。如上图所示,Student模型将训练样本和标签传入训练reader,DistillReader从训练reader中读取训练样本发送给Teacher模型,然后获取推理结果。 DistillReader 的结构如下图。

推理结果和原训练reader中的数据封装在一起,返回一个包含推理结果的新reader给Student模型,这样Teacher 模型的推理和Student 模型的训练就可以流水行并行起来了。

DistillReader

可容错弹性服务

可容错弹性服务的实现架构如下图所示,首先我们通过Paddle Serving将多个Teacher模型部署成服务,并注册服务到Redis数据库中;Student模型则作为客户端从服务发现中查询所需的Teacher服务;服务发现从Redis数据库查询并按某种负载均衡策略返回客户端所需的Teacher列表;每当Teacher变化时,客户端就可以实时拿到最新Teacher列表,连接Teacher进行蒸馏训练,不用担心发生由于连接到被收回的Teacher资源而导致任务失败的请况。

Student 模型给Teacher 模型发送样本并获取推理结果,而Teacher 模型服务端则可以随意增删,弹性调整。

DistillReader

快速开始

下文通过训练图像分类模型来简单介绍服务型蒸馏训练的使用。

为简单起见,使用的是单机环境,服务端和客户端部署在了同一个服务器上,服务端的IP地址是127.0.0.1。如果部署在不同设备上,修改下代码中的IP地址即可。

环境准备

下命令拉取镜像,镜像为CUDA9.0的环境,在里面我们预装了EDL、飞桨核心框架和Padde Serving等相关依赖。

docker pull hub.baidubce.com/paddle-edl/paddle_edl:latest-cuda9.0-cudnn7
nvidia-docker run -name paddle_edl hub.baidubce.com/paddle-edl/paddle_edl:latest-cuda9.0-cudnn7 /bin/bash

启动Teacher模型

如下命令在1号GPU卡启动Teacher服务,其中Teacher模型为图像分类模型ResNeXt101_32x16d_wsl,服务的端口号为9898,并启动了内存优化功能。

cd example/distill/resnet

wget --no-check-certificate https://paddle-edl.bj.bcebos.com/distill_teacher_model/ResNeXt101_32x16d_wsl_model.tar.gz
tar -zxf ResNeXt101_32x16d_wsl_model.tar.gz

python -m paddle_serving_server_gpu.serve \
  --model ResNeXt101_32x16d_wsl_model \
  --mem_optim True \
  --port 9898 \
  --gpu_ids 1

启动Student模型训练

如下命令在0号GPU卡启动Student模型,启动的student模型为ResNet50_vd。 其中train_with_fleet.py是用于启动训练的脚本,用户需要在其中添加蒸馏训练相关的代码,如果用户想了解脚本的修改方法或可以参考如github

python -m paddle.distributed.launch --gpus 0 \
  ./train_with_fleet.py \
  --model=ResNet50_vd \
  --data_dir=./ImageNet \
  --use_distill_service=True \
  --distill_teachers=127.0.0.1:9898

优化低配网络的分布式GPU训练

在网络带宽较低的训练场景(如: 公有云上训练,联邦训练)中,梯度同步在低带宽网络下的延迟成为训练速度的主要瓶颈。 Fleet 作为Paddle通用的分布式训练API 实现了: Deep Gradient CompressionLocal SGD 两种训练策略来针对性解决这一问题。

DGC 优化低配网络的分布式GPU训练

DGC 简介

大规模分布式训练需要较高的网络带宽以便进行梯度的聚合更新,这限制了多节点训练的扩展性,同时也需要昂贵的高带宽设备。在低带宽的网络环境下进行分布式训练时,梯度同步成为训练加速的瓶颈。 Deep Gradient Compression 发现:分布式SGD中有99.9%的梯度交换都是冗余的,可以使用深度梯度压缩选择重要梯度进行通信来减少通信量,降低对通信带宽的依赖。Fleet 实现了DGC的稀疏通信方式,可有效在低配网络下进行GPU分布式训练。Fleet 实现了 DGC 论文中的 预热训练 (warming up training), 动量修正 (Momentum Correction), 局部梯度修剪 (local gradient clipping), 动量因子掩藏 (Momentum factor masking) 等策略, 和 正则化项修正 (Weight Decay Correction) 避免稀疏梯度通信训练带来的最终模型精度损失。

下面将介绍 DGC 稀疏通信方式的适用场景、试验效果、基本原理和使用方法。

适用场景

DGC稀疏通信在低带宽通信瓶颈时会有较大的性能提升,但在单机多卡及RDMA网络通信并非瓶颈情况下,并不会带来性能上的提升。同时由于AllGather的通信量会随卡数的增多而增大,所以DGC的多机训练规模也不宜过大。故DGC适用于低配网络,同时节点规模不宜过大,如>128张卡。在云网络或高带宽网络设备昂贵时,DGC可有效降低训练成本。

试验效果

  • 模型:FasterRCNN

  • 硬件: P40两机分布式,每台机器一卡,TCP网络测试。

  • 取300-700步耗时/400step。

  • 精度无损。

DGC 原理简介

这里将简单介绍介绍Fleet DGC 中的一些原理和对应参数应该如何设置。

梯度稀疏

DGC的基本思路是通过只传送重要梯度,即只发送大于给定阈值的梯度来减少通信带宽的使用。为避免信息的丢失,DGC会将剩余梯度在局部累加起来,最终这些梯度会累加大到足以传输。 换个角度,从理论依据上来看,局部梯度累加等同于随时间推移增加batch size,(DGC相当于每一个梯度有自己的batch size)。

假设 N是训练节点个数, b为单卡batch size,局部梯度累加可以被认为batch size从\(Nb\)增大为\(NbT\),其中T是两次更新的稀疏通信间隔。详细的公式推导请参阅 [1] 预热调参 ^^^^^^^^

对于正常的训练,使用DGC一般需进行预热训练,否则可能会有精度损失。由于paddle稀疏梯度聚合通信使用了AllGather,通信量会随卡数增加而增长,所以在卡数较多时不推荐较低稀疏度的预热训练。参数设置如下:

# 1. 以1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%
strategy.dgc_configs = {
    "rampup_begin_step": 1252*2,
    "rampup_step": 1252*3,
    "sparsity": [0.984375, 0.996, 0.999]
}
# 2. 前面4个epochs都使用dense通信,之后默认0.999稀疏度运行
strategy.dgc_configs = {
    "rampup_begin_step": 1252*4,
    "rampup_step": 1,
    "sparsity": [0.999]
}

对于Fine-tuning训练,可无需预热训练,从第0个epoch直接使用DGC即可。

# 从第0步开始DGC稀疏通信
strategy.dgc_configs = {
    "rampup_begin_step": 0,
    "rampup_step": 1,
    "sparsity": [0.999]
}
局部梯度累加改进

正常情况,稀疏更新会严重影响收敛性。DGC中采用动量修正(Momentum Correction)和局部梯度裁减(Local Gradient Clipping), 动量因子掩藏, 正则化项修正 4个策略来解决这个问题。

动量修正

上文”局部梯度累加等同于随时间推移增加batch size“的推导没有考虑 Momentum存在的情况。当稀疏度很高时,使用原始Momentum 公式会显著降低模型性能,所以需要在原始公式的基础上对梯度进行修正。

动量修正使用部累加速度项\(u_t\)而非累加真实的梯度\(\nabla_{k, t}\)来修改Momentum 方程,修正后的动量更新公式如下:

\[u_{k, t}=m u_{k, t-1}+\nabla_{k, t}, \quad v_{k, t}=v_{k, t-1}+u_{k, t}, \quad w_{t+1}=w_{t}-\eta \sum_{k=1}^{N} \operatorname{sparse}\left(v_{k, t}\right)\]
局部梯度修剪

梯度修剪是防止梯度爆炸的常用方法。这方法由Pascanu等人在2013年提出,当梯度的l2-norms和大于给定阈值时,就对梯度rescale。正常梯度修剪在梯度聚合后使用,而DGC因为每个节点独立的进行局部梯度累加,所以DGC在使用\(G_t\)累加前对其进行局部梯度修剪。阈值缩放为原来的\(N^{-1/2}\)

动量因子掩藏

因为推迟了较小梯度更新权重的时间,所以会有权重陈旧性问题。稀疏度为99.9%时大部分参数需600到1000步更新一次。迟滞效应会减缓收敛并降低模型精度。DGC中使用下面方程来掩藏动量因子减缓陈旧性问题。

\[Mask \leftarrow\left|v_{k, t}\right|>t h r, \quad v_{k, t} \leftarrow v_{k, t} \odot \neg Mask, \quad u_{k, t} \leftarrow u_{k, t} \odot \neg Mask\]

此掩码可以停止延迟梯度产生的动量,防止陈旧梯度把权重引入错误的方向。

正则化(Weight Decay)项修正

类似动量修正,DGC 中我们同样需要对正则化项进行修正来让参数的延迟更新方向更加准确。

和动量修思路相同,修正需要在局部梯度上添加局部Weight Decay。

\[\nabla_{k, t}=\nabla_{k, t}+\frac{\lambda}{N} w_{t}\]

上述策略已经在Fleet 框架中实现,用户无须设置。

DGC 快速开始

下文以单机八卡上训练ResNet50 为例子简单介绍 Fleet 中 DGC 的使用。 因为 8张 GPU 的通信都在同一节点内, 一般情况下梯度通信并不会成为训练的瓶颈, 这里只是以其为例子,介绍Fleet 中 DGC 参数的设置。

注意

  • 硬件环境要求: DGC目前只支持GPU多卡及分布式collective训练,需要有相应的cuda、cuDNN、nccl环境。

  • Paddle环境要求: DGC只支持GPU,所以需GPU版本的Paddle。

添加依赖
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模型,下面代码中的is_collective=True表示采用集合通信的GPU分布式模式训练模型。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据

用户可以通过X.applications接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
DGC 相关策略

这里假设:1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%

  • rampup_begin_step (int):DGC(含预热训练)开始的 step

  • rampup_step (int):DGC中预热训练持续的 step. 如果sparsity 是 [0.75, 0.9375, 0.984375, 0.996, 0.999],rampup_step 设成 100时, 在 0~19 steps 时 sparsity=0.75,在 20~39 steps 时 sparsity=0.9375, 以此类推。

  • sparsity (list[float]):稀疏度 threshold, (1 - current sparsity) % 的gradient 将会被 allreduce。

dist_strategy = fleet.DistributedStrategy()

dist_strategy.dgc = True
dist_strategy.dgc_configs = {
    "rampup_begin_step": 1252*2,
    "rampup_step": 1252*3,
    "sparsity": [0.984375, 0.996, 0.999]
}

optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

这一部分和Fleet 中其他任务基本相同:

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本

一行启动单机多卡分布式训练:

fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log ./example_dgc.py

# reader shuffle seed 0
# trainerid, trainer_count 0 8
# read images from 0, length: 160146, lines length: 160146, total: 1281168
# worker_index: 0, step0 cost = 7.151402, speed: 37.698432
# worker_index: 0, step1 cost = 7.112389, speed: 101.518513
# worker_index: 0, step2 cost = 7.004275, speed: 111.062341
# worker_index: 0, step3 cost = 7.039385, speed: 62.173126
# worker_index: 0, step4 cost = 6.985911, speed: 104.058060
# ......

使用Local SGD 优化低带宽下分布式训练

Local SGD 简介

在使用 distributed SGD 进行数据并行的分布式训练时,常会遇到以下两个问题:

  • 分布式训练的吞吐会受到集群中随机慢节点(straggling node)和通信延迟的影响。

  • 数据并行分布式增大了训练实际的batch size,过大的batch size 会影响最终的训练精度。

Local SGD 通过延长节点间同步的间隔(局部异步训练)来减轻慢节点的影响和减少通信频率,以此提升训练的吞吐速度;另一方面,为了减小相对于本地训练(小batch size)的精度损失,[1][2] 分别提出了:post-Local SGD自适应步长 (Adaptive Communication) Local SGD 策略,来减少参数同步频率降低带来的精度损失。 同步SGD 和 Local SGD 在通信同步上的差异如下图所示。

Synchronous SGD 和 Local SGD

在Local SGD 训练中,集群中的每个 trainer 各自会独立的进行 H 个连续的 SGD 更新, 然后集群中的所有 trainer 会进行通信,同步(averaging)所有 trainers 上的参数。一个双 trainers,同步间隙为3 步长(iterations) 的Local SGD过程如下图所示。黄绿两条路径表示两个 trainers 各自的 Local SGD 更新过程,中间的蓝色路径表示同步后的模型所在的位置。

Local SGD

Local SGD中的一个关键问题是如何确定参数同步的间隔(频率),以到达训练吞吐和训练精度间更好的平衡:

  • 增大参数同步的间隔可以减少 trainers 间通信延迟的影响提高训练吞吐.

  • 但增大同步间隔可能会造成最终训练精度的损失。 [1]

以下两个策略从不同角度试图达到更好的平衡:

  • post Local SGD 将训练过程分成两个阶段:第一阶段 wokers 间同步的间隔为 1 个步长,即同步SGD,来保证最终训练精度;在第二阶段增大同步间隔到固定常数 H,来提升训练吞吐。

  • Adaptive Communication Local SGD 通过动态的调整参数同步的间隔来尝试达到训练吞吐和精度间的更好的平衡。在训练初始或者上一段参数同步完成后,根据如下公式计算一下次参数同步的间隔(iteration)。详细的公式推导和参数定义请参考原论文。

Fleet 中实现了 post Local SGDAdaptive Communication Local SGD 两种策略。 中下文将给出 Fleet中 Local SGD 的实践效果,并通过一个简单例子介绍如何在Fleet 中使用 Local SGD。

试验效果

试验设置

model

dataset

local batch size

cluster

dtype

warming up

learning rate decay

resnet50

Imagenet

128

4 x 8 x V100

FP32

30

polynomial

试验结果

local step

qps

acc1

acc5

1

8270.91

0.7579

0.9266

2

8715.67

0.7533

0.9265

4

8762.66

0.7551

0.9260

8

9184.62

0.7511

0.9239

16

9431.46

0.7429

0.9206

ADACOMM

8945.74

0.7555

0.9270

可以看到在 post Local SGD (固定同步间隔)情况下,更新间隔越长训练的吞吐越高,但是模型的最终进度也会损失越大。 当使用 ADAPTIVE COMMUNICATION 策略后,训练在吞吐和精度间达到了一个更好的平衡。

Local SGD 快速开始

下文将以在单机8卡中训练 ResNet50 为例子简单介绍 Fleet 中 Local SGD 的用法。 需要注意的是 单机八卡的通信都在同一节点内, 一般情况下参数同步并不会成为训练的瓶颈, 这里只是以其为例子,介绍Fleet 中 Local SGD 参数的设置。

添加依赖
import os
import fleetx as X
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import time
import paddle.distributed.fleet as fleet
定义分布式模式并初始化

通过X.parse_train_configs()接口,用户可以定义训练相关的参数,如:学习率、衰减率等。同时通过fleet.init()接口定义了分布式模型,下面代码中的is_collective=True表示采用集合通信的GPU分布式模式训练模型。

paddle.enable_static()
configs = X.parse_train_configs()
fleet.init(is_collective=True)
加载模型及数据

用户可以通过X.applications接口加载我们预先定义好的模型,如:Resnet50、VGG16、BERT等。并使用定制化的data_loader加载模型,同时可以定义训练中使用的batch_size等参数。

model = X.applications.Resnet50()
downloader = X.utils.Downloader()
local_path = downloader.download_from_bos(
    fs_yaml='https://fleet.bj.bcebos.com/test/loader/small_imagenet.yaml',
    local_path='./data')
batch_size = 32
loader = model.get_train_dataloader(local_path, batch_size=batch_size)
定义Local SGD 相关策略

用户首先需要定义paddle SGD 对象,并在SGD 对象中设置学习率参数。目前local SGD和自适应步长 local SGD都仅支持SGD和Momentum两种优化器。

  • post Local SGD 中,有两个用户设置参数 begin_stepk_steps,局部更新和参数同步都由框架自动完成。begin_step 指定从第几个step之后进行local SGD算法,取值为大于0的整数;k_step 指定训练过程中的全局参数更新间隔,取值为大于0的整数。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.localsgd = True
dist_strategy.localsgd_configs = {
    "k_steps": 1,
    "begin_step": 1,
}

optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
  • 自适应步长 local SGD 中,有两个用户设置参数 begin_stepinit_k_steps。begin_step 指定从第几个step之后进行自适应local SGD算法,取值为大于0的整数;用户需要设置init_k_steps作为第一次参数同步的间隔,之后的同步间隔将由上文中的公式动态确定,在学习率较大时,参数变化大,减小step,多进行通信从而保证快速收敛;在学习率较小时,参数变化小,增大step,减少通信次数,从而提升训练速度。 需要注意的是自适应步长策略中,系统会默认限制最大的同步间隔为 16 step,当公式计算出的间隔大于16 时,按16 steps 进行参数同步。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.adaptive_localsgd = True
dist_strategy.adaptive_localsgd_configs = {
    "init_k_steps": 1,
    "begin_step": 1,
}

optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

这一部分和Fleet 中其他任务基本相同:

place = fluid.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(model.main_prog,
                        feed=data,
                        fetch_list=[model.loss.name])

    end_time = time.time()
    print(
        "worker_index: %d, step%d cost = %f, speed: %f"
        % (fleet.worker_index(), i, cost_val[0], batch_size / (end_time - start_time)))
运行训练脚本

一行启动单机多卡分布式训练:

fleetrun --gpus 0,1,2,3,4,5,6,7 --log_dir log resnet50_localsgd.py

# reader shuffle seed 0
# trainerid, trainer_count 0 8
# read images from 0, length: 160146, lines length: 160146, total: 1281168
# worker_index: 0, step0 cost = 7.151402, speed: 37.698432
# worker_index: 0, step1 cost = 7.112389, speed: 101.518513
# worker_index: 0, step2 cost = 7.004275, speed: 111.062341
# worker_index: 0, step3 cost = 7.039385, speed: 62.173126
# worker_index: 0, step4 cost = 6.985911, speed: 104.058060
# ......

飞桨分布式训练基线报告

Resnet50性能基准

Resnet50是当前视觉领域比较通用的预训练模型后端,同时也作为评价深度学习框架训练性能的最重要模型之一,我们提供Resnet50在ImageNet数据集上的性能基准供用户参考。

软硬件配置情况

基本版本信息

软硬件指标

具体配置

实例类型

百度X-Man 2.0

单实例GPU

8x NVIDIA® Tesla® V100

操作系统

Ubuntu 16.04 LTS with tests run via Docker

CPU

Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz

内存

512G

CUDA / CUDNN版本

10.1 / 7.6.5

NCCL / DALI 版本

2.4.7 / 0.24.0

多GPU实例互联信息

InfiniBand 100 Gb/sec

Paddle Github Commit

FleetX Github Commit

硬盘类型

本地SSD硬盘

数据集

ImageNet

评估模型

Resnet50

复现代码地址

Resn et50-Benchmark

Python版本

3.7

硬件拓扑

nvidia-smi topo -m
GPU0    GPU1    GPU2    GPU3    GPU4    GPU5    GPU6    GPU7    mlx5_0  CPU Affinity
GPU0     X      NV2     NV2     NV1     NV1     NODE    NODE    NODE    NODE    0-23
GPU1    NV2      X      NV1     NV1     NODE    NV2     NODE    NODE    NODE    0-23
GPU2    NV2     NV1      X      NV2     NODE    NODE    NV1     NODE    NODE    0-23
GPU3    NV1     NV1     NV2      X      NODE    NODE    NODE    NV2     NODE    0-23
GPU4    NV1     NODE    NODE    NODE     X      NV2     NV2     NV1     NODE    0-23
GPU5    NODE    NV2     NODE    NODE    NV2      X      NV1     NV1     NODE    0-23
GPU6    NODE    NODE    NV1     NODE    NV2     NV1      X      NV2     NODE    0-23
GPU7    NODE    NODE    NODE    NV2     NV1     NV1     NV2      X      NODE    0-23
mlx5_0  NODE    NODE    NODE    NODE    NODE    NODE    NODE    NODE     X

Legend:

X    = Self
SYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
PHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
PXB  = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge)
PIX  = Connection traversing a single PCIe switch
NV#  = Connection traversing a bonded set of # NVLinks

性能测试方法

  • 硬件资源 采用多机多卡训练,以实例数 x 单实例GPU卡数作为评价标准,评价 1 x 1, 1 x 8, 2 x 8, 4 x 8情况下的性能基准。

  • 训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整,即不考虑收敛的对比。

  • 测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是忽略前20个step,然后取后面100个step的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。

基准测试结果

  • 单位:Images/s,使用精度FP32,DistributedStrategy如下:

import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.nccl_comm_num = 1

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

335.43

2488.49

4629.71

9093.41

64

353.38

2643.75

5325.44

10536.83

128

368.11

2797.31

5635.98

11261.72

  • 单位:Images/s,使用自动混合精度Automatic Mixed Precision(AMP)进行训练,DistributedStrategy如下:

import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.amp = True
dist_strategy.nccl_comm_num = 1

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

740.01

4467.82

8628.19

16970.01

64

919.95

6148.98

12071.29

23682.78

128

1018.3

7324.31

14342.03

28397.43

256

1096.5

8166.11

16189.79

32366.39

  • 单位:Images/s, 自动并行模式,DistributedStrategy如下:

import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
dist_strategy.auto = True

为了获得更好的性能,我们默认打开了DALI进行数据IO,这里注意单机单开的自动并行开启的选项可能和多卡不同,因此加速比不具备参考意义。

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

666.38

4467.82

8711.69

19107.42

64

761

6148.98

12076.77

24314.58

128

890.03

6793.73

13514.66

27277.36

256

938.57

7305.66

14599.55

29361.24

Bert模型训练性能基线

Transformer模型性能基线

VGG16模型训练性能基线

VGG16是当前视觉领域比较通用的预训练模型后端,同时也作为评价深度学习框架训练性能的最重要模型之一,我们提供VGG16在ImageNet数据集上的性能基准供用户参考。

软硬件配置情况

基本版本信息

软硬件指标

具体配置

实例类型

百度X-Man 2.0

单实例GPU

8x NVIDIA® Tesla® V100

操作系统

Ubuntu 16.04 LTS with tests run via Docker

CPU

Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz

内存

512G

CUDA / CUDNN版本

10.1 / 7.6.5

NCCL / DALI 版本

2.4.7 / 0.24.0

多GPU实例互联信息

InfiniBand 100 Gb/sec

Paddle Github Commit

b6711e24ea5e17fa24e9b1ba516fe03186dd4a2c

FleetX Github Commit

296e0d71a33a8d250f6626733bc2535465f5f6e0

硬盘类型

本地SSD硬盘

数据集

ImageNet

评估模型

VGG16

复现代码地址

VGG16 -Benchmark

Python版本

3.7

硬件拓扑

nvidia-smi topo -m
GPU0    GPU1    GPU2    GPU3    GPU4    GPU5    GPU6    GPU7    mlx5_0  CPU Affinity
GPU0     X      NV2     NV2     NV1     NV1     NODE    NODE    NODE    NODE    0-23
GPU1    NV2      X      NV1     NV1     NODE    NV2     NODE    NODE    NODE    0-23
GPU2    NV2     NV1      X      NV2     NODE    NODE    NV1     NODE    NODE    0-23
GPU3    NV1     NV1     NV2      X      NODE    NODE    NODE    NV2     NODE    0-23
GPU4    NV1     NODE    NODE    NODE     X      NV2     NV2     NV1     NODE    0-23
GPU5    NODE    NV2     NODE    NODE    NV2      X      NV1     NV1     NODE    0-23
GPU6    NODE    NODE    NV1     NODE    NV2     NV1      X      NV2     NODE    0-23
GPU7    NODE    NODE    NODE    NV2     NV1     NV1     NV2      X      NODE    0-23
mlx5_0  NODE    NODE    NODE    NODE    NODE    NODE    NODE    NODE     X

Legend:

X    = Self
SYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
PHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
PXB  = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge)
PIX  = Connection traversing a single PCIe switch
NV#  = Connection traversing a bonded set of # NVLinks

性能测试方法

  • 硬件资源 采用多机多卡训练,以实例数 x 单实例GPU卡数作为评价标准,评价 1 x 1, 1 x 8, 2 x 8, 4 x 8情况下的性能基准。

  • 训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整 ,即不考虑收敛的对比。

  • 测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是忽略前20个step,然后取后 面100个step的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。

基准测试结果

  • 单位:Images/s,使用精度FP32,DistributedStrategy如下:

exec_strategy = fluid.ExecutionStrategy()
dist_strategy = fleet.DistributedStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.nccl_comm_num = 1

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

252.68

1888.20

2873.18

5548.48

64

261.33

1980.04

3900.12

7617.47

128

266.24

2027.06

4028.78

7848.70

  • 单位:Images/s,使用自动混合精度Automatic Mixed Precision(AMP)进行训练,DistributedStrategy如下:

import paddle
import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = 2
exec_strategy.num_iteration_per_drop_scope = 100
dist_strategy.execution_strategy = exec_strategy
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
dist_strategy.build_strategy = build_strategy
dist_strategy.amp = True
dist_strategy.nccl_comm_num = 1

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

407.69

3332.17

5136.50

9544.81

64

468.51

3708.32

7112.45

14013.01

128

512.02

3892.58

7618.34

15219.57

256

439.47

3409.96

6779.20

13443.23

  • 单位:Images/s, 自动并行模式,DistributedStrategy如下:

import paddle.distributed.fleet as fleet
dist_strategy = fleet.DistributedStrategy()
dist_strategy.auto = True

为了获得更好的性能,我们默认打开了DALI进行数据IO,这里注意单机单开的自动并行开启的选项可能和多卡不同,因此加速比不具备参考意义。

batch / node

1 x 1

1 x 8

2 x 8

4 x 8

32

409.68

3044.60

4840.74

7668.70

64

455.98

3395.67

6525.20

12237.04

128

472.81

3587.29

7019.13

13562.80

256

407.88

3154.15

6217.92

12147.46

Word2vec模型性能基准

word2vec被广泛运用于NLP及推荐系统领域,同时也作为评价深度学习框架训练性能的重要模型之一,我们提供word2vec在1-billion数据集上的分布式训练的性能基准供用户参考,包含4、8、16、32台服务器联合训练。

软硬件配置情况

基本版本信息

软硬件指标

具体配置

实例类型

纯CPU训练集群

操作系统

Ubuntu 16.04 LTS with tests run via Docker

CPU

Intel(R) Xeon(R) CPU E5-2450 v2 @ 2.50GHz

内存

128G

Paddle Github Commit

FleetX Github Commit

硬盘类型

本地SSD硬盘

数据集

1-billion

评估模型

Work2vec

复现代码地址

Word 2vec-Benchmark

Python版本

3.7

性能测试方法

  • 硬件资源 采用多机多进程训练,每一台服务器实例均启动一个pserver,一个trainer,每个trainer配置固定数量的线程,以实例数作为评价标准,评价 4, 8, 16, 32情况下的性能基准。

  • 训练超参数 批量大小(Batch Size)对训练性能影响最大,因此会对比不同批量大小下模型的训练吞吐。注意,改变批量大小通常需要调整优化算法,但为了对比公平,暂不对优化算法做调整>,即不考虑收敛的对比。

  • 测试指标获取方法 当前主流的深度学习框架通常采用异步数据读取,由于训练开始前框架并没有提前开始读取数据,整个训练速度存在一定的IO瓶颈。我们的测试方法是统计一轮训练中总的字数,用一轮总字数除总耗时得到的平均吞吐作为单次任务的训练吞吐。为了避免硬件的波动(例如网络通信等)对性能的影响,我们会利用相同的配置进行7次运行,取中值。

基准测试结果

  • 单位:Images/s,使用精度FP32,DistributedStrategy如下:

import paddle
import paddle.distributed.fleet as fleet

dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync=True
dist_strategy.a_sync_configs = {"k_steps": 100}

batch / node

4

8

16

32

100

55597.5

55082.37

53302.63

47280.91

用户FAQ

  • TBA


FleetX使用Apache License 2.0开源协议