MMDataParallel
mmdetection:MMDataParallel
1.前言
mmdetection为了利用多GPU,在mmcv中实现了MMDataParallel和MMDistributedDataParallel。有没有发现这两者的命名和pytorch中的DataParallel和DistributedDataParallel命名方式很相似。没错,mmcv中的dataparallel就是继承了pytorch中的dataparallel。
下面简单讲一下,DataParallel和DistributedDataParallel两者的区别,DataParallel实现的是单进程多线程,DistributedDataParallel实现的是多进程。总而言之,DistributedDataParallel实现了真正的分布式并发计算,很好地利用多进程,并且GPU间通信开销更小。关于他们的详细区别,可见PyTorch 源码解读之 DP & DDP。写的非常详细,好文要顶。
对于计算密集任务,python的多进程要比多线程更好,熟悉python并发编程,肯定听说过python的GIL锁机制,导致多线程无法利用多核cpu。
2.MMDataParallel简介
本文将介绍MMDataParallel,建议先了解一下pytorch的DataParallel。这里就简单介绍一下pytorch的DataParallel。
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
功能:包装模型,利用多线程实现分发并行机制。可以把数据平均分发到各个 GPU
上,每个 GPU
实际的数据量为batch/gpu_num,实现并行计算。具体流程如下:
- 各GPU卡分别计算损失和梯度
- 所有梯度整合到 device[0](即主GPU)
- device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
构造参数:
- module:需要包装分发的模型
- device_ids:可分发的 GPU,为None则默认分发到所有可见的 GPU(即CUDA_VISIBLE_DEVICES)。其中主GPU为device_ids[0],所以当为device_ids=None时,即为所有可见GPU的第一个。
注意!:使用 DataParallel时,要把数据和模型放在主GPU上,主GPU负责分发数据
MMDataParallel继承于DataParallel,主要做了两个变化:
- 重写scatter方法(这步很重要,不仅要支持对DataContainer的解封装,还要将数据放到对应的GPU上)
- 实现train_step和val_step两个api接口,供mmcv.runner调用
注意:MMDataParallel的train_step和val_step方法只支持单GPU,forward方法是可以支持多GPU。train_step和val_step会在训练时被runner调用,forward会在test、inference时被调用。
from itertools import chain。
from torch.nn.parallel import DataParallel
from .scatter_gather import scatter_kwargs
class MMDataParallel(DataParallel): # 继承于pytorch.DataParallel
def __init__(self, *args, dim=0, **kwargs):
# 构造函数和pytorch的DataParallel一致
super(MMDataParallel, self).__init__(*args, dim=dim, **kwargs)
self.dim = dim
def forward(self, *inputs, **kwargs):
# 在api/test.py和api/inference.py中调用model(return_loss=False, rescale=True, **data_batch)
# 所以实际上,这里参数只使用了kwargs,inputs为空tuple()
if not self.device_ids:
# 在cpu下
# 因为pytorch的Dataparallel.forward不会对cpu进行scatter,所以这里要判断一下
inputs, kwargs = self.scatter(inputs, kwargs, [-1])
return self.module(*inputs[0], **kwargs[0])
else:
return super().forward(*inputs, **kwargs)
# pytorch的forward,实现了把数据平均分发到各个 GPU 上,每个 GPU 实际的数据量为batch/gpu_num
def scatter(self, inputs, kwargs, device_ids): # 非常重要
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def train_step(self, *inputs, **kwargs):
# 参数就使用了inputs,为(data_loader[i])
# kwargs为空字典{}
if not self.device_ids:
inputs, kwargs = self.scatter(inputs, kwargs, [-1])
return self.module.train_step(*inputs[0], **kwargs[0]) # 调用真正的module
# 需要注意的是MMDataParallel只支持单GPU,MMDistributedDataParallel才支持多GPU
assert len(self.device_ids) == 1, \
('MMDataParallel only supports single GPU training, if you need to'
' train with multiple GPUs, please use MMDistributedDataParallel'
' instead.')
for t in chain(self.module.parameters(), self.module.buffers()):
# 遍历parameter和buffer,parameter记录需要BP更新的参数,buffer记录不需要BP更新的参数
# 还记得吗,使用 DataParallel时,要把数据和模型放在主GPU上。这里就是判断模型是不是在主GPU上
if t.device != self.src_device_obj:
raise RuntimeError(
'module must have its parameters and buffers '
f'on device {self.src_device_obj} (device_ids[0]) but '
f'found one of them on device: {t.device}')
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
return self.module.train_step(*inputs[0], **kwargs[0])
def val_step(self, *inputs, **kwargs):
# 参数就使用了inputs,为(data_loader[i])
# kwargs为空字典{}
if not self.device_ids:
inputs, kwargs = self.scatter(inputs, kwargs, [-1])
return self.module.val_step(*inputs[0], **kwargs[0])
# 需要注意的是MMDataParallel只支持单GPU,MMDistributedDataParallel才支持多GPU
assert len(self.device_ids) == 1, \
('MMDataParallel only supports single GPU training, if you need to'
' train with multiple GPUs, please use MMDistributedDataParallel'
' instead.')
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError(
'module must have its parameters and buffers '
f'on device {self.src_device_obj} (device_ids[0]) but '
f'found one of them on device: {t.device}')
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
return self.module.val_step(*inputs[0], **kwargs[0]) # 调用真正的module
MMDataParallel的关键点在于重写了self.scatter这个方法,不仅要支持对DataContainer的解封装,还要将数据放到对应的GPU上。
MMDataParallel的输入就是dataloader的输出,先回忆一下mmdetection源码阅读笔记:数据分发文章中介绍的dataloder的输出:以训练时为例
# num_gpus就是分布式训练时的gpu数量,默认为1
dataloader[i] = dict('img':DC(list[ num_gpus * tensor(Batch,C,H,W) ],stacked=True,cpu_only=False),
'img_metas':DC(list[ num_gpus * list[Batch*dict('flip','ori_shape'……)] ],stacked=False,cpu_only=True),
'gt_bboxes':DC(list[ num_gpus * list[Batch*tensor] ],stacked=Fasle,cpu_only=False),
'gt_labels':DC(list[ num_gpus * list[Batch*tensor] ],stacked=False,cpu_only=False)
)
scatter就是要对上述进行解封装,下面正式介绍重写后的scatter的具体实现:
import torch
from torch.nn.parallel._functions import Scatter as OrigScatter
from ._functions import Scatter
from .data_container import DataContainer
def scatter(inputs, target_gpus, dim=0):
# 主要就是通过递归调用scatter_map来进行解封装
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
if target_gpus != [-1]:
return OrigScatter.apply(target_gpus, None, dim, obj) # gpu下,调用pytorch的scatter
else:
return Scatter.forward(target_gpus, obj) # cpu下,只是加了一层tuple,可以直接替换为return (obj,)!
if isinstance(obj, DataContainer):
if obj.cpu_only:
return obj.data
else:
return Scatter.forward(target_gpus, obj.data) # 这一步是把将数据放到对应的GPU上!!
if isinstance(obj, tuple) and len(obj) > 0:
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
out = list(map(list, zip(*map(scatter_map, obj))))
return out
if isinstance(obj, dict) and len(obj) > 0:
out = list(map(type(obj), zip(*map(scatter_map, obj.items()))))
return out
return [obj for targets in target_gpus] # 其他类型
try:
return scatter_map(inputs)
finally:
# 这步操作是为了避免,递归调用闭包函数引起的循环引用,进而导致内存泄漏
# 嘶,不是很懂,为什么会循环引用,请大佬指教!
scatter_map = None
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
inputs = scatter(inputs, target_gpus, dim) if inputs else []
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
# 保持数量一致
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
训练时,dataloader[i]经过解封装后,变为:
# num_gpus就是分布式训练时的gpu数量,默认为1
tuple(num_gpus *
dict('img': tensor(Batch,C,H,W) ,
'img_metas': list[Batch*dict('flip','ori_shape'……)],
'gt_bboxes': list[Batch*tensor],
'gt_labels': list[Batch*tensor]
)
)
同样的,采用MultiScaleFlipAug时,解封装后,变为:
# num_augs就是一张图片经过数据增强后的数据数量
tuple(num_gpus *
dict('img': list[ num_augs * tensor(Batch,C,H,W) ],
'img_metas': list[ num_augs * list[Batch*dict('flip','ori_shape'……)] ],
'gt_bboxes': list[ num_augs * list[Batch*tensor] ],
'gt_labels': list[ num_augs * list[Batch*tensor] ],
'return_loss':False,
'rescale':True,
)
)
# 需要注意的是,num_augs大于1时,batch必须为1,num_augs等于1时,batch可以大于1。
解封装的过程告一段落,下面就剩下Scatter.forward实现将数据放到对应GPU上了。
Scatter.forward定义在_function.py中:
在这里主要有一个小技巧,当需要大量把数据从内存放到显存上的操作时,是件耗时的事情,我们应该用异步操作,即kernel会被发射到device的某个Stream上排队,CPU继续异步执行。这一切都通过cuda的stream进行封装,但网上讲这个cuda.Stream的使用太少,我觉得不用再深入了解。
import torch
from torch.nn.parallel._functions import _get_stream
def scatter(input, devices, streams=None):
if streams is None:
streams = [None] * len(devices)
if isinstance(input, list):
chunk_size = (len(input) - 1) // len(devices) + 1
outputs = [ # 这里是关键,就是在做分配,把数据分配到相应GPU上
scatter(input[i], [devices[i // chunk_size]],
[streams[i // chunk_size]]) for i in range(len(input))
]
return outputs
elif isinstance(input, torch.Tensor):
output = input.contiguous()
# TODO: copy to a pinned buffer first (if copying from CPU)
stream = streams[0] if output.numel() > 0 else None
if devices != [-1]:
with torch.cuda.device(devices[0]), torch.cuda.stream(stream): # 开启stream流
output = output.cuda(devices[0], non_blocking=True)
return output
else:
raise Exception(f'Unknown type {type(input)}.')
def synchronize_stream(output, devices, streams): # 同步
if isinstance(output, list):
chunk_size = len(output) // len(devices)
for i in range(len(devices)):
for j in range(chunk_size):
synchronize_stream(output[i * chunk_size + j], [devices[i]],[streams[i]])
elif isinstance(output, torch.Tensor):
if output.numel() != 0:
with torch.cuda.device(devices[0]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[0])
output.record_stream(main_stream)
else:
raise Exception(f'Unknown type {type(output)}.')
def get_input_device(input):
if isinstance(input, list):
for item in input:
input_device = get_input_device(item)
if input_device != -1:
return input_device
return -1
elif isinstance(input, torch.Tensor):
return input.get_device() if input.is_cuda else -1
else:
raise Exception(f'Unknown type {type(input)}.')
class Scatter:
@staticmethod
def forward(target_gpus, input):
input_device = get_input_device(input)
streams = None
if input_device == -1 and target_gpus != [-1]:
streams = [_get_stream(device) for device in target_gpus] # CPU到GPU的stream流
outputs = scatter(input, target_gpus, streams)
if streams is not None:
synchronize_stream(outputs, target_gpus, streams) # 同步stream流
return tuple(outputs) if isinstance(outputs,list) else (outputs,)
# 注意!如果outputs是tensor,那么tuple(outputs)会降维,应该改为(outputs,)
pytorch的_get_stream具体实现为:
# 维护全局的Stream流
_streams = None
def _get_stream(device: int):
"""Gets a background stream for copying between CPU and GPU"""
global _streams
if device == -1:
return None
if _streams is None:
_streams = [None] * torch.cuda.device_count()
if _streams[device] is None:
_streams[device] = torch.cuda.Stream(device) # 创建cpu到device的Stream
return _streams[device]