mmdetection:MMDataParallel

1.前言

mmdetection为了利用多GPU,在mmcv中实现了MMDataParallel和MMDistributedDataParallel。有没有发现这两者的命名和pytorch中的DataParallel和DistributedDataParallel命名方式很相似。没错,mmcv中的dataparallel就是继承了pytorch中的dataparallel。

下面简单讲一下,DataParallel和DistributedDataParallel两者的区别,DataParallel实现的是单进程多线程,DistributedDataParallel实现的是多进程。总而言之,DistributedDataParallel实现了真正的分布式并发计算,很好地利用多进程,并且GPU间通信开销更小。关于他们的详细区别,可见PyTorch 源码解读之 DP & DDP。写的非常详细,好文要顶。

对于计算密集任务,python的多进程要比多线程更好,熟悉python并发编程,肯定听说过python的GIL锁机制,导致多线程无法利用多核cpu。


2.MMDataParallel简介

本文将介绍MMDataParallel,建议先了解一下pytorch的DataParallel。这里就简单介绍一下pytorch的DataParallel。

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) 功能:包装模型,利用多线程实现分发并行机制。可以把数据平均分发到各个 GPU 上,每个 GPU 实际的数据量为batch/gpu_num,实现并行计算。具体流程如下:

  1. 各GPU卡分别计算损失和梯度
  2. 所有梯度整合到 device[0](即主GPU)
  3. device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新

构造参数:

  • module:需要包装分发的模型
  • device_ids:可分发的 GPU,为None则默认分发到所有可见的 GPU(即CUDA_VISIBLE_DEVICES)。其中主GPU为device_ids[0],所以当为device_ids=None时,即为所有可见GPU的第一个。

注意!:使用 DataParallel时,要把数据和模型放在主GPU上,主GPU负责分发数据

MMDataParallel继承于DataParallel,主要做了两个变化:

  • 重写scatter方法(这步很重要,不仅要支持对DataContainer的解封装,还要将数据放到对应的GPU上)
  • 实现train_step和val_step两个api接口,供mmcv.runner调用

注意:MMDataParallel的train_step和val_step方法只支持单GPU,forward方法是可以支持多GPU。train_step和val_step会在训练时被runner调用,forward会在test、inference时被调用。

from itertools import chain。
from torch.nn.parallel import DataParallel
from .scatter_gather import scatter_kwargs

class MMDataParallel(DataParallel): # 继承于pytorch.DataParallel
    def __init__(self, *args, dim=0, **kwargs):
        # 构造函数和pytorch的DataParallel一致
        super(MMDataParallel, self).__init__(*args, dim=dim, **kwargs)
        self.dim = dim

    def forward(self, *inputs, **kwargs):
        # 在api/test.py和api/inference.py中调用model(return_loss=False, rescale=True, **data_batch)
        # 所以实际上,这里参数只使用了kwargs,inputs为空tuple()

        if not self.device_ids: 
            # 在cpu下
            # 因为pytorch的Dataparallel.forward不会对cpu进行scatter,所以这里要判断一下
            inputs, kwargs = self.scatter(inputs, kwargs, [-1])
            return self.module(*inputs[0], **kwargs[0])
        else:
            return super().forward(*inputs, **kwargs) 
            # pytorch的forward,实现了把数据平均分发到各个 GPU 上,每个 GPU 实际的数据量为batch/gpu_num

    def scatter(self, inputs, kwargs, device_ids): # 非常重要
        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim) 

    def train_step(self, *inputs, **kwargs):
        # 参数就使用了inputs,为(data_loader[i])
        # kwargs为空字典{}
        if not self.device_ids:
            inputs, kwargs = self.scatter(inputs, kwargs, [-1])
            return self.module.train_step(*inputs[0], **kwargs[0]) # 调用真正的module
        # 需要注意的是MMDataParallel只支持单GPU,MMDistributedDataParallel才支持多GPU
        assert len(self.device_ids) == 1, \
            ('MMDataParallel only supports single GPU training, if you need to'
             ' train with multiple GPUs, please use MMDistributedDataParallel'
             ' instead.')

        for t in chain(self.module.parameters(), self.module.buffers()): 
            # 遍历parameter和buffer,parameter记录需要BP更新的参数,buffer记录不需要BP更新的参数
            # 还记得吗,使用 DataParallel时,要把数据和模型放在主GPU上。这里就是判断模型是不是在主GPU上
            if t.device != self.src_device_obj:
                raise RuntimeError(
                    'module must have its parameters and buffers '
                    f'on device {self.src_device_obj} (device_ids[0]) but '
                    f'found one of them on device: {t.device}')

        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
        return self.module.train_step(*inputs[0], **kwargs[0])

    def val_step(self, *inputs, **kwargs):
        # 参数就使用了inputs,为(data_loader[i])
        # kwargs为空字典{}
        if not self.device_ids:
            inputs, kwargs = self.scatter(inputs, kwargs, [-1])
            return self.module.val_step(*inputs[0], **kwargs[0])
        # 需要注意的是MMDataParallel只支持单GPU,MMDistributedDataParallel才支持多GPU
        assert len(self.device_ids) == 1, \
            ('MMDataParallel only supports single GPU training, if you need to'
             ' train with multiple GPUs, please use MMDistributedDataParallel'
             ' instead.')

        for t in chain(self.module.parameters(), self.module.buffers()):
            if t.device != self.src_device_obj:
                raise RuntimeError(
                    'module must have its parameters and buffers '
                    f'on device {self.src_device_obj} (device_ids[0]) but '
                    f'found one of them on device: {t.device}')

        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
        return self.module.val_step(*inputs[0], **kwargs[0]) # 调用真正的module

MMDataParallel的关键点在于重写了self.scatter这个方法,不仅要支持对DataContainer的解封装,还要将数据放到对应的GPU上

MMDataParallel的输入就是dataloader的输出,先回忆一下mmdetection源码阅读笔记:数据分发文章中介绍的dataloder的输出:以训练时为例

# num_gpus就是分布式训练时的gpu数量,默认为1
dataloader[i] = dict('img':DC(list[ num_gpus * tensor(Batch,C,H,W) ],stacked=True,cpu_only=False),
     'img_metas':DC(list[ num_gpus * list[Batch*dict('flip','ori_shape'……)] ],stacked=False,cpu_only=True),
      'gt_bboxes':DC(list[ num_gpus * list[Batch*tensor] ],stacked=Fasle,cpu_only=False),
      'gt_labels':DC(list[ num_gpus * list[Batch*tensor] ],stacked=False,cpu_only=False)
    )

scatter就是要对上述进行解封装,下面正式介绍重写后的scatter的具体实现:

import torch
from torch.nn.parallel._functions import Scatter as OrigScatter
from ._functions import Scatter
from .data_container import DataContainer

def scatter(inputs, target_gpus, dim=0):
    # 主要就是通过递归调用scatter_map来进行解封装
    def scatter_map(obj):
        if isinstance(obj, torch.Tensor):
            if target_gpus != [-1]:
                return OrigScatter.apply(target_gpus, None, dim, obj) # gpu下,调用pytorch的scatter
            else:
                return Scatter.forward(target_gpus, obj) # cpu下,只是加了一层tuple,可以直接替换为return (obj,)!
        if isinstance(obj, DataContainer):
            if obj.cpu_only:
                return obj.data
            else:
                return Scatter.forward(target_gpus, obj.data) # 这一步是把将数据放到对应的GPU上!!
        if isinstance(obj, tuple) and len(obj) > 0:
            return list(zip(*map(scatter_map, obj)))
        if isinstance(obj, list) and len(obj) > 0:
            out = list(map(list, zip(*map(scatter_map, obj))))
            return out
        if isinstance(obj, dict) and len(obj) > 0:
            out = list(map(type(obj), zip(*map(scatter_map, obj.items()))))
            return out
        return [obj for targets in target_gpus] # 其他类型

    try:
        return scatter_map(inputs)
    finally:
        # 这步操作是为了避免,递归调用闭包函数引起的循环引用,进而导致内存泄漏
        # 嘶,不是很懂,为什么会循环引用,请大佬指教!
        scatter_map = None

def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
    inputs = scatter(inputs, target_gpus, dim) if inputs else []
    kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
    # 保持数量一致
    if len(inputs) < len(kwargs):
        inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
    elif len(kwargs) < len(inputs):
        kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
    inputs = tuple(inputs)
    kwargs = tuple(kwargs)
    return inputs, kwargs

训练时,dataloader[i]经过解封装后,变为:

# num_gpus就是分布式训练时的gpu数量,默认为1
tuple(num_gpus * 
        dict('img': tensor(Batch,C,H,W) ,
             'img_metas': list[Batch*dict('flip','ori_shape'……)],
             'gt_bboxes': list[Batch*tensor],
             'gt_labels': list[Batch*tensor]
            )
      )

同样的,采用MultiScaleFlipAug时,解封装后,变为:

# num_augs就是一张图片经过数据增强后的数据数量
tuple(num_gpus * 
         dict('img': list[ num_augs * tensor(Batch,C,H,W) ],
              'img_metas': list[ num_augs * list[Batch*dict('flip','ori_shape'……)] ],
              'gt_bboxes': list[ num_augs * list[Batch*tensor] ],
              'gt_labels': list[ num_augs * list[Batch*tensor] ],
              'return_loss':False,
              'rescale':True,
              )
     )
# 需要注意的是,num_augs大于1时,batch必须为1,num_augs等于1时,batch可以大于1。

解封装的过程告一段落,下面就剩下Scatter.forward实现将数据放到对应GPU上了。

Scatter.forward定义在_function.py中:

在这里主要有一个小技巧,当需要大量把数据从内存放到显存上的操作时,是件耗时的事情,我们应该用异步操作,即kernel会被发射到device的某个Stream上排队,CPU继续异步执行。这一切都通过cuda的stream进行封装,但网上讲这个cuda.Stream的使用太少,我觉得不用再深入了解。

import torch
from torch.nn.parallel._functions import _get_stream

def scatter(input, devices, streams=None):
    if streams is None:
        streams = [None] * len(devices)

    if isinstance(input, list):
        chunk_size = (len(input) - 1) // len(devices) + 1
        outputs = [ # 这里是关键,就是在做分配,把数据分配到相应GPU上
            scatter(input[i], [devices[i // chunk_size]],
                    [streams[i // chunk_size]]) for i in range(len(input))
        ]
        return outputs
    elif isinstance(input, torch.Tensor):
        output = input.contiguous()
        # TODO: copy to a pinned buffer first (if copying from CPU)
        stream = streams[0] if output.numel() > 0 else None
        if devices != [-1]:
            with torch.cuda.device(devices[0]), torch.cuda.stream(stream): # 开启stream流
                output = output.cuda(devices[0], non_blocking=True)
        return output
    else:
        raise Exception(f'Unknown type {type(input)}.')

def synchronize_stream(output, devices, streams): # 同步
    if isinstance(output, list):
        chunk_size = len(output) // len(devices)
        for i in range(len(devices)):
            for j in range(chunk_size):
                synchronize_stream(output[i * chunk_size + j], [devices[i]],[streams[i]])
    elif isinstance(output, torch.Tensor):
        if output.numel() != 0:
            with torch.cuda.device(devices[0]):
                main_stream = torch.cuda.current_stream()
                main_stream.wait_stream(streams[0])
                output.record_stream(main_stream)
    else:
        raise Exception(f'Unknown type {type(output)}.')


def get_input_device(input):
    if isinstance(input, list):
        for item in input:
            input_device = get_input_device(item)
            if input_device != -1:
                return input_device
        return -1
    elif isinstance(input, torch.Tensor):
        return input.get_device() if input.is_cuda else -1
    else:
        raise Exception(f'Unknown type {type(input)}.')


class Scatter:
    @staticmethod
    def forward(target_gpus, input):
        input_device = get_input_device(input) 
        streams = None
        if input_device == -1 and target_gpus != [-1]:
            streams = [_get_stream(device) for device in target_gpus] # CPU到GPU的stream流

        outputs = scatter(input, target_gpus, streams)
        if streams is not None:
            synchronize_stream(outputs, target_gpus, streams) # 同步stream流

        return tuple(outputs) if isinstance(outputs,list) else (outputs,)
        # 注意!如果outputs是tensor,那么tuple(outputs)会降维,应该改为(outputs,)

pytorch的_get_stream具体实现为:

# 维护全局的Stream流
_streams = None

def _get_stream(device: int):
    """Gets a background stream for copying between CPU and GPU"""
    global _streams
    if device == -1:
        return None
    if _streams is None:
        _streams = [None] * torch.cuda.device_count()
    if _streams[device] is None:
        _streams[device] = torch.cuda.Stream(device) # 创建cpu到device的Stream
    return _streams[device]