DETR代码解读
DETR代码解析
一、源代码
https://link.zhihu.com/?target=https%3A//github.com/facebookresearch/detr
二、代码解析
1.前言
二维位置编码:
构造位置矩阵x_embed、y_embed,这里用python函数cumsum,对一个矩阵的元素进行累加,那么累加以后最后一个元素就是对所以累加元素的和,省去了求和的步骤,直接用这个和做归一化,对应x_embed[:,:,-1:]和y_embed[:,-1:,:]。
代码中一些变量的shape:
tensor_list的类型是NestedTensor,内部自动附加mask,用于表示动态shape,是pytorch新特性,全是false。
x:(b,c,H,W)
mask: (b,H,W),全是False。
not_mask:(b,H,W),全是True。
首先出现的y_embed:(b,H,W),具体是1,1,1,1,......,2,2,2,2,......,3,3,3,3,......
首先出现的x_embed:(b,H,W),具体是1,2,3,4,......,1,2,3,4,......,1,2,3,4,......
self.num_pos_feats = 128
首先出现的dim_t = [0,1,2,3,......,127]
pos_x:(b,H,W,128)
pos_y:(b,H,W,128)
flatten后面的数字是指:flatten()方法应从哪个轴开始展开操作。
torch.stack((pos_y[:,:,:,0::2].sin(),pos_y[:,:,:,0::2].cos()),dim=4)
这一步执行完后变成(b,H,W,2,64)通过flatten()方法从第3个轴开始展平,变为:(b,H,W,128)
toch.cat((pos_y,pos_x),dim=3)之后变为(b,H,W,256),最后permute为(b,256,H,W)。
PositionEmbeddingSine类继承nn.Module类。
2.位置编码
class PositionEmbeddingSine(nn.Module):
def forward(self, tensor_list: NestedTensor):
#输入是b,c,h,w
#tensor_list的类型是NestedTensor,内部自动附加了mask,
#用于表示动态shape,是pytorch中tensor新特性https://github.com/pytorch/nestedtensor
x = tensor_list.tensors
# 附加的mask,shape是b,h,w 全是false
mask = tensor_list.mask
assert mask is not None
not_mask = ~mask
# 因为图像是2d的,所以位置编码也分为x,y方向
# 1 1 1 1 .. 2 2 2 2... 3 3 3...
y_embed = not_mask.cumsum(1, dtype=torch.float32)
# 1 2 3 4 ... 1 2 3 4...
x_embed = not_mask.cumsum(2, dtype=torch.float32)
if self.normalize:
eps = 1e-6
y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale
# num_pos_feats = 128
# 0~127 self.num_pos_feats=128,因为前面输入向量是256,编码是一半sin,一半cos
dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)
# 输出shape=b,h,w,128
pos_x = x_embed[:, :, :, None] / dim_t
pos_y = y_embed[:, :, :, None] / dim_t
pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
# 每个特征图的xy位置都编码成256的向量,其中前128是y方向编码,而128是x方向编码
return pos
# b,n=256,h,w
作者定义了一种数据结构:NestedTensor,里面打包存放了两个变量:x和mask。x是张量数据,mask是x数据是否是padding填充的。mask数据如下图所示:
而not_mask则是对mask取反说明feature map某个(x,y)是否是真实值,未用padding,如下:
之后对图像进行编码,因为不是一维编码,是二维编码,需要考虑x,y方向上的编码,这里使用cumsum()求累加和的方法来进行编码,x和y经cumsum()后的结果如下:
并进行归一化后,得到对应的x、y编码,最终通过cat()操作将两个128维向量合成256维向量,作为最终编码pos返回。
3.backbone
class BackboneBase(nn.Module):
def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool):
super().__init__()
for name, parameter in backbone.named_parameters():
if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name:
parameter.requires_grad_(False)
if return_interm_layers:
return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"}
else:
return_layers = {'layer4': "0"}
#作用的模型:定义BackboneBase时传入的nn.Moduleclass的backbone,返回的layer:来自bool变量return_interm_layers
self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)
self.num_channels = num_channels
def forward(self, tensor_list: NestedTensor):
#BackboneBase的输入是一个NestedTensor
#xs中间层的输出,
xs = self.body(tensor_list.tensors)
out: Dict[str, NestedTensor] = {}
for name, x in xs.items():
m = tensor_list.mask
assert m is not None
#F.interpolate上下采样,调整mask的size
#to(torch.bool) 把mask转化为Bool型变量
mask = F.interpolate(m[None].float(), size=x.shape[-2:]).to(torch.bool)[0]
out[name] = NestedTensor(x, mask)
return out
class Backbone(BackboneBase):
"""ResNet backbone with frozen BatchNorm."""
def __init__(self, name: str,
train_backbone: bool,
return_interm_layers: bool,
dilation: bool):
#根据name选择backbone, num_channels, return_interm_layers等,传入BackboneBase初始化
backbone = getattr(torchvision.models, name)(
replace_stride_with_dilation=[False, False, dilation],
pretrained=is_main_process(), norm_layer=FrozenBatchNorm2d)
num_channels = 512 if name in ('resnet18', 'resnet34') else 2048
super().__init__(backbone, train_backbone, num_channels, return_interm_layers)
本文采取Resnt作为backbone。
class Joiner(nn.Sequential):
def __init__(self, backbone, position_embedding):
super().__init__(backbone, position_embedding)
def forward(self, tensor_list: NestedTensor):
xs = self[0](tensor_list)
out: List[NestedTensor] = []
pos = []
for name, x in xs.items():
out.append(x)
# position encoding
pos.append(self[1](x).to(x.tensors.dtype))
return out, pos
def build_backbone(args):
#position_embedding是个nn.module
position_embedding = build_position_encoding(args)
train_backbone = args.lr_backbone > 0
return_interm_layers = args.masks
#backbone是个nn.module
backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation)
#nn.Sequential在一起
model = Joiner(backbone, position_embedding)
model.num_channels = backbone.num_channels
return model
把Backbone和之前的PositionEmbeddingSine连在一起: Backbone完以后输出(b,c,h,w),再经过PositionEmbeddingSine输出(b,H,W,256)。
4.Transformer
encoder
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]):
return tensor if pos is None else tensor + pos
def forward_post(self,
src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
# 和标准做法有点不一样,src加上位置编码得到q和k,但是v依然还是src,
# 也就是v和qk不一样
q = k = self.with_pos_embed(src, pos)
src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)[0]
#Add and Norm
src = src + self.dropout1(src2)
src = self.norm1(src)
#FFN
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
#Add and Norm
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
def forward_pre(self, src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
src2 = self.norm1(src)
q = k = self.with_pos_embed(src2, pos)
src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(src2)
src2 = self.norm2(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
src = src + self.dropout2(src2)
return src
def forward(self, src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
if self.normalize_before:
return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
return self.forward_post(src, src_mask, src_key_padding_mask, pos)
有了一个Encoder Layer的定义,再看Transformer的整个Encoder:
class TransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers, norm=None):
super().__init__()
# 编码器copy6份
self.layers = _get_clones(encoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
def forward(self, src,
mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
# 内部包括6个编码器,顺序运行
# src是图像特征输入,shape=hxw,b,256
output = src
for layer in self.layers:
# 第一个编码器输入来自图像特征,后面的编码器输入来自前一个编码器输出
output = layer(output, src_mask=mask,
src_key_padding_mask=src_key_padding_mask, pos=pos)
return output
Decoder
class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]):
return tensor if pos is None else tensor + pos
def forward_post(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
#query,key的输入是object queries(query_pos) + Decoder的输入(tgt),shape都是(100,b,256)
#value的输入是Decoder的输入(tgt),shape = (100,b,256)
q = k = self.with_pos_embed(tgt, query_pos)
#Multi-head self-attention
tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
key_padding_mask=tgt_key_padding_mask)[0]
#Add and Norm
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
#query的输入是上一个attention的输出(tgt) + object queries(query_pos)
#key的输入是Encoder的位置编码(pos) + Encoder的输出(memory)
#value的输入是Encoder的输出(memory)
tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
key=self.with_pos_embed(memory, pos),
value=memory, attn_mask=memory_mask,
key_padding_mask=memory_key_padding_mask)[0]
#Add and Norm
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
#FFN
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
def forward_pre(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
tgt2 = self.norm1(tgt)
q = k = self.with_pos_embed(tgt2, query_pos)
tgt2 = self.self_attn(q, k, value=tgt2, attn_mask=tgt_mask,
key_padding_mask=tgt_key_padding_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt2 = self.norm2(tgt)
tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt2, query_pos),
key=self.with_pos_embed(memory, pos),
value=memory, attn_mask=memory_mask,
key_padding_mask=memory_key_padding_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt2 = self.norm3(tgt)
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2))))
tgt = tgt + self.dropout3(tgt2)
return tgt
def forward(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
if self.normalize_before:
return self.forward_pre(tgt, memory, tgt_mask, memory_mask,
tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)
return self.forward_post(tgt, memory, tgt_mask, memory_mask,
tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)
有了一个Decoder def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos Layer的定义,再看Transformer的整个Decoder:
class TransformerDecoder(nn.Module):
#值得注意的是:在使用TransformerDecoder时需要传入的参数有:
# tgt:Decoder的输入,memory:Encoder的输出,pos:Encoder的位置编码的输出,query_pos:Object Queries,一堆mask
def forward(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
output = tgt
# Decoder输入的tgt:(100, b, 256)
intermediate = []
for layer in self.layers:
output = layer(output, memory, tgt_mask=tgt_mask,
memory_mask=memory_mask,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask,
pos=pos, query_pos=query_pos)
if self.return_intermediate:
intermediate.append(self.norm(output))
if self.norm is not None:
output = self.norm(output)
if self.return_intermediate:
intermediate.pop()
intermediate.append(output)
if self.return_intermediate:
return torch.stack(intermediate)
return output.unsqueeze(0)
5.FFN
class MLP(nn.Module):
""" Very simple multi-layer perceptron (also called FFN)"""
def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
super().__init__()
self.num_layers = num_layers
h = [hidden_dim] * (num_layers - 1)
self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))
def forward(self, x):
for i, layer in enumerate(self.layers):
x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
return x
匈牙利匹配HungarianMatcher类: 这个类的目的是计算从targets到predictions的一种最优排列。 predictions比targets的数量多,但我们要进行1-to-1 matching,所以多的predictions将与 匹配。 这个函数整体在构建(13)式,cost_class,cost_bbox,cost_giou,对应的就是(13)式中的几个损失函数,它们的维度都是(b,100,m)。 m包含了这个batch内部所有的GT Bounding Box。
# pred_logits:[b,100,92]
# pred_boxes:[b,100,4]
# targets是个长度为b的list,其中的每个元素是个字典,共包含:labels-长度为(m,)的Tensor,元素是标签;boxes-长度为(m,4)的Tensor,元素是Bounding Box。
# detr分类输出,num_queries=100,shape是(b,100,92)
bs, num_queries = outputs["pred_logits"].shape[:2]
# We flatten to compute the cost matrices in a batch
out_prob = outputs["pred_logits"].flatten(0, 1).softmax(-1) # [batch_size * num_queries, num_classes] = [100b, 92]
out_bbox = outputs["pred_boxes"].flatten(0, 1) # [batch_size * num_queries, 4] = [100b, 4]
# 准备分类target shape=(m,)里面存储的是类别索引,m包括了整个batch内部的所有gt bbox
# Also concat the target labels and boxes
tgt_ids = torch.cat([v["labels"] for v in targets])# (m,)[3,6,7,9,5,9,3]
# 准备bbox target shape=(m,4),已经归一化了
tgt_bbox = torch.cat([v["boxes"] for v in targets])# (m,4)
#(100b,92)->(100b, m),对于每个预测结果,把目前gt里面有的所有类别值提取出来,其余值不需要参与匹配
#对应上述公式,类似于nll loss,但是更加简单
# Compute the classification cost. Contrary to the loss, we don't use the NLL,
# but approximate it in 1 - proba[target class].
# The 1 is a constant that doesn't change the matching, it can be ommitted.
#行:取每一行;列:只取tgt_ids对应的m列
cost_class = -out_prob[:, tgt_ids]# (100b, m)
# Compute the L1 cost between boxes, 计算out_bbox和tgt_bbox两两之间的l1距离 (100b, m)
cost_bbox = torch.cdist(out_bbox, tgt_bbox, p=1)# (100b, m)
# Compute the giou cost betwen boxes, 额外多计算一个giou loss (100b, m)
cost_giou = -generalized_box_iou(box_cxcywh_to_xyxy(out_bbox), box_cxcywh_to_xyxy(tgt_bbox))
#得到最终的广义距离(100b, m),距离越小越可能是最优匹配
# Final cost matrix
C = self.cost_bbox * cost_bbox + self.cost_class * cost_class + self.cost_giou * cost_giou
#(100b, m)--> (b, 100, m)
C = C.view(bs, num_queries, -1).cpu()
#计算每个batch内部有多少物体,后续计算时候按照单张图片进行匹配,没必要batch级别匹配,徒增计算
sizes = [len(v["boxes"]) for v in targets]
#匈牙利最优匹配,返回匹配索引
#enumerate(C.split(sizes, -1))]:(b,100,image1,image2,image3,...)
indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]
return [(torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)) for i, j in indices]
在得到匹配关系后算loss就水到渠成了。loss_labels计算分类损失,loss_boxes计算回归损失,包含 L_1 loss,iou_loss。