DETR代码解析

一、源代码

https://link.zhihu.com/?target=https%3A//github.com/facebookresearch/detr

二、代码解析

1.前言

  • 二维位置编码:

    构造位置矩阵x_embed、y_embed,这里用python函数cumsum,对一个矩阵的元素进行累加,那么累加以后最后一个元素就是对所以累加元素的和,省去了求和的步骤,直接用这个和做归一化,对应x_embed[:,:,-1:]和y_embed[:,-1:,:]。

  • 代码中一些变量的shape:

    tensor_list的类型是NestedTensor,内部自动附加mask,用于表示动态shape,是pytorch新特性,全是false。

    x:(b,c,H,W)

    mask: (b,H,W),全是False。

    not_mask:(b,H,W),全是True。

    首先出现的y_embed:(b,H,W),具体是1,1,1,1,......,2,2,2,2,......,3,3,3,3,......

    首先出现的x_embed:(b,H,W),具体是1,2,3,4,......,1,2,3,4,......,1,2,3,4,......

    self.num_pos_feats = 128

    首先出现的dim_t = [0,1,2,3,......,127]

    pos_x:(b,H,W,128)

    pos_y:(b,H,W,128)

    flatten后面的数字是指:flatten()方法应从哪个轴开始展开操作。

    torch.stack((pos_y[:,:,:,0::2].sin(),pos_y[:,:,:,0::2].cos()),dim=4)

    这一步执行完后变成(b,H,W,2,64)通过flatten()方法从第3个轴开始展平,变为:(b,H,W,128)

    toch.cat((pos_y,pos_x),dim=3)之后变为(b,H,W,256),最后permute为(b,256,H,W)。

    PositionEmbeddingSine类继承nn.Module类。

2.位置编码

class PositionEmbeddingSine(nn.Module):

    def forward(self, tensor_list: NestedTensor):
#输入是b,c,h,w
#tensor_list的类型是NestedTensor,内部自动附加了mask,
#用于表示动态shape,是pytorch中tensor新特性https://github.com/pytorch/nestedtensor
        x = tensor_list.tensors
# 附加的mask,shape是b,h,w 全是false
        mask = tensor_list.mask
        assert mask is not None
        not_mask = ~mask
# 因为图像是2d的,所以位置编码也分为x,y方向
# 1 1 1 1 ..  2 2 2 2... 3 3 3...
        y_embed = not_mask.cumsum(1, dtype=torch.float32)
# 1 2 3 4 ... 1 2 3 4...
        x_embed = not_mask.cumsum(2, dtype=torch.float32)
        if self.normalize:
            eps = 1e-6
            y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
            x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale

# num_pos_feats = 128
# 0~127 self.num_pos_feats=128,因为前面输入向量是256,编码是一半sin,一半cos
        dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
        dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)

# 输出shape=b,h,w,128
        pos_x = x_embed[:, :, :, None] / dim_t
        pos_y = y_embed[:, :, :, None] / dim_t
        pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
        pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
        pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
# 每个特征图的xy位置都编码成256的向量,其中前128是y方向编码,而128是x方向编码
        return pos
# b,n=256,h,w

作者定义了一种数据结构:NestedTensor,里面打包存放了两个变量:x和mask。x是张量数据,mask是x数据是否是padding填充的。mask数据如下图所示:

而not_mask则是对mask取反说明feature map某个(x,y)是否是真实值,未用padding,如下:

之后对图像进行编码,因为不是一维编码,是二维编码,需要考虑x,y方向上的编码,这里使用cumsum()求累加和的方法来进行编码,x和y经cumsum()后的结果如下:

并进行归一化后,得到对应的x、y编码,最终通过cat()操作将两个128维向量合成256维向量,作为最终编码pos返回。

3.backbone

class BackboneBase(nn.Module):

    def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool):
        super().__init__()
        for name, parameter in backbone.named_parameters():
            if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name:
                parameter.requires_grad_(False)
        if return_interm_layers:
            return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"}
        else:
            return_layers = {'layer4': "0"}

#作用的模型:定义BackboneBase时传入的nn.Moduleclass的backbone,返回的layer:来自bool变量return_interm_layers
        self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)
        self.num_channels = num_channels

    def forward(self, tensor_list: NestedTensor):
#BackboneBase的输入是一个NestedTensor
#xs中间层的输出,
        xs = self.body(tensor_list.tensors)
        out: Dict[str, NestedTensor] = {}
        for name, x in xs.items():
            m = tensor_list.mask
            assert m is not None
#F.interpolate上下采样,调整mask的size
#to(torch.bool)  把mask转化为Bool型变量
            mask = F.interpolate(m[None].float(), size=x.shape[-2:]).to(torch.bool)[0]
            out[name] = NestedTensor(x, mask)
        return out


class Backbone(BackboneBase):
    """ResNet backbone with frozen BatchNorm."""
    def __init__(self, name: str,
                 train_backbone: bool,
                 return_interm_layers: bool,
                 dilation: bool):
#根据name选择backbone, num_channels, return_interm_layers等,传入BackboneBase初始化
        backbone = getattr(torchvision.models, name)(
            replace_stride_with_dilation=[False, False, dilation],
            pretrained=is_main_process(), norm_layer=FrozenBatchNorm2d)
        num_channels = 512 if name in ('resnet18', 'resnet34') else 2048
        super().__init__(backbone, train_backbone, num_channels, return_interm_layers)

本文采取Resnt作为backbone。

class Joiner(nn.Sequential):
    def __init__(self, backbone, position_embedding):
        super().__init__(backbone, position_embedding)

    def forward(self, tensor_list: NestedTensor):
        xs = self[0](tensor_list)
        out: List[NestedTensor] = []
        pos = []
        for name, x in xs.items():
            out.append(x)
            # position encoding
            pos.append(self[1](x).to(x.tensors.dtype))

        return out, pos

def build_backbone(args):
#position_embedding是个nn.module
    position_embedding = build_position_encoding(args)
    train_backbone = args.lr_backbone > 0
    return_interm_layers = args.masks
#backbone是个nn.module
    backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation)
#nn.Sequential在一起
    model = Joiner(backbone, position_embedding)
    model.num_channels = backbone.num_channels
    return model

把Backbone和之前的PositionEmbeddingSine连在一起: Backbone完以后输出(b,c,h,w),再经过PositionEmbeddingSine输出(b,H,W,256)。

4.Transformer

encoder

class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)
        self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        return tensor if pos is None else tensor + pos

    def forward_post(self,
                     src,
                     src_mask: Optional[Tensor] = None,
                     src_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None):
    # 和标准做法有点不一样,src加上位置编码得到q和k,但是v依然还是src,
    # 也就是v和qk不一样
        q = k = self.with_pos_embed(src, pos)
        src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
#Add and Norm
        src = src + self.dropout1(src2)
        src = self.norm1(src)
#FFN
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
#Add and Norm
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

    def forward_pre(self, src,
                    src_mask: Optional[Tensor] = None,
                    src_key_padding_mask: Optional[Tensor] = None,
                    pos: Optional[Tensor] = None):
        src2 = self.norm1(src)
        q = k = self.with_pos_embed(src2, pos)
        src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src2 = self.norm2(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
        src = src + self.dropout2(src2)
        return src

    def forward(self, src,
                src_mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        if self.normalize_before:
            return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
        return self.forward_post(src, src_mask, src_key_padding_mask, pos)

有了一个Encoder Layer的定义,再看Transformer的整个Encoder:

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__()
        # 编码器copy6份
        self.layers = _get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, src,
                mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        # 内部包括6个编码器,顺序运行
        # src是图像特征输入,shape=hxw,b,256
        output = src
        for layer in self.layers:
            # 第一个编码器输入来自图像特征,后面的编码器输入来自前一个编码器输出
            output = layer(output, src_mask=mask,
                           src_key_padding_mask=src_key_padding_mask, pos=pos)
        return output

Decoder

class TransformerDecoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)
        self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        return tensor if pos is None else tensor + pos
        
    def forward_post(self, tgt, memory,
                     tgt_mask: Optional[Tensor] = None,
                     memory_mask: Optional[Tensor] = None,
                     tgt_key_padding_mask: Optional[Tensor] = None,
                     memory_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None,
                     query_pos: Optional[Tensor] = None):
                     
#query,key的输入是object queries(query_pos) + Decoder的输入(tgt),shape都是(100,b,256)
#value的输入是Decoder的输入(tgt),shape = (100,b,256)
        q = k = self.with_pos_embed(tgt, query_pos)

#Multi-head self-attention
        tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
#Add and Norm
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

#query的输入是上一个attention的输出(tgt) + object queries(query_pos)
#key的输入是Encoder的位置编码(pos) + Encoder的输出(memory)
#value的输入是Encoder的输出(memory)
        tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
                                   key=self.with_pos_embed(memory, pos),
                                   value=memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]

#Add and Norm
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)

#FFN
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt

    def forward_pre(self, tgt, memory,
                    tgt_mask: Optional[Tensor] = None,
                    memory_mask: Optional[Tensor] = None,
                    tgt_key_padding_mask: Optional[Tensor] = None,
                    memory_key_padding_mask: Optional[Tensor] = None,
                    pos: Optional[Tensor] = None,
                    query_pos: Optional[Tensor] = None):
        tgt2 = self.norm1(tgt)
        q = k = self.with_pos_embed(tgt2, query_pos)
        tgt2 = self.self_attn(q, k, value=tgt2, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt2 = self.norm2(tgt)
        tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt2, query_pos),
                                   key=self.with_pos_embed(memory, pos),
                                   value=memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt2 = self.norm3(tgt)
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2))))
        tgt = tgt + self.dropout3(tgt2)
        return tgt

    def forward(self, tgt, memory,
                tgt_mask: Optional[Tensor] = None,
                memory_mask: Optional[Tensor] = None,
                tgt_key_padding_mask: Optional[Tensor] = None,
                memory_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None,
                query_pos: Optional[Tensor] = None):
        if self.normalize_before:
            return self.forward_pre(tgt, memory, tgt_mask, memory_mask,
                                    tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)
        return self.forward_post(tgt, memory, tgt_mask, memory_mask,
                                 tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)

有了一个Decoder def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos Layer的定义,再看Transformer的整个Decoder:

class TransformerDecoder(nn.Module):

#值得注意的是:在使用TransformerDecoder时需要传入的参数有:
# tgt:Decoder的输入,memory:Encoder的输出,pos:Encoder的位置编码的输出,query_pos:Object Queries,一堆mask
    def forward(self, tgt, memory,
                tgt_mask: Optional[Tensor] = None,
                memory_mask: Optional[Tensor] = None,
                tgt_key_padding_mask: Optional[Tensor] = None,
                memory_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None,
                query_pos: Optional[Tensor] = None):
        output = tgt

# Decoder输入的tgt:(100, b, 256)
        intermediate = []

        for layer in self.layers:
            output = layer(output, memory, tgt_mask=tgt_mask,
                           memory_mask=memory_mask,
                           tgt_key_padding_mask=tgt_key_padding_mask,
                           memory_key_padding_mask=memory_key_padding_mask,
                           pos=pos, query_pos=query_pos)
            if self.return_intermediate:
                intermediate.append(self.norm(output))

        if self.norm is not None:
            output = self.norm(output)
            if self.return_intermediate:
                intermediate.pop()
                intermediate.append(output)

        if self.return_intermediate:
            return torch.stack(intermediate)

        return output.unsqueeze(0)

5.FFN

class MLP(nn.Module):
    """ Very simple multi-layer perceptron (also called FFN)"""

    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super().__init__()
        self.num_layers = num_layers
        h = [hidden_dim] * (num_layers - 1)
        self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))

    def forward(self, x):
        for i, layer in enumerate(self.layers):
            x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
        return x

匈牙利匹配HungarianMatcher类: 这个类的目的是计算从targets到predictions的一种最优排列。 predictions比targets的数量多,但我们要进行1-to-1 matching,所以多的predictions将与 匹配。 这个函数整体在构建(13)式,cost_class,cost_bbox,cost_giou,对应的就是(13)式中的几个损失函数,它们的维度都是(b,100,m)。 m包含了这个batch内部所有的GT Bounding Box。

# pred_logits:[b,100,92]
# pred_boxes:[b,100,4]
# targets是个长度为b的list,其中的每个元素是个字典,共包含:labels-长度为(m,)的Tensor,元素是标签;boxes-长度为(m,4)的Tensor,元素是Bounding Box。
# detr分类输出,num_queries=100,shape是(b,100,92)
        bs, num_queries = outputs["pred_logits"].shape[:2]


        # We flatten to compute the cost matrices in a batch
        out_prob = outputs["pred_logits"].flatten(0, 1).softmax(-1)  # [batch_size * num_queries, num_classes] = [100b, 92]
        out_bbox = outputs["pred_boxes"].flatten(0, 1)  # [batch_size * num_queries, 4] = [100b, 4]

# 准备分类target shape=(m,)里面存储的是类别索引,m包括了整个batch内部的所有gt bbox
        # Also concat the target labels and boxes
        tgt_ids = torch.cat([v["labels"] for v in targets])# (m,)[3,6,7,9,5,9,3]
# 准备bbox target shape=(m,4),已经归一化了
        tgt_bbox = torch.cat([v["boxes"] for v in targets])# (m,4)

#(100b,92)->(100b, m),对于每个预测结果,把目前gt里面有的所有类别值提取出来,其余值不需要参与匹配
#对应上述公式,类似于nll loss,但是更加简单
        # Compute the classification cost. Contrary to the loss, we don't use the NLL,
        # but approximate it in 1 - proba[target class].
        # The 1 is a constant that doesn't change the matching, it can be ommitted.
#行:取每一行;列:只取tgt_ids对应的m列
        cost_class = -out_prob[:, tgt_ids]# (100b, m)

        # Compute the L1 cost between boxes, 计算out_bbox和tgt_bbox两两之间的l1距离 (100b, m)
        cost_bbox = torch.cdist(out_bbox, tgt_bbox, p=1)# (100b, m)

        # Compute the giou cost betwen boxes, 额外多计算一个giou loss (100b, m)
        cost_giou = -generalized_box_iou(box_cxcywh_to_xyxy(out_bbox), box_cxcywh_to_xyxy(tgt_bbox))

#得到最终的广义距离(100b, m),距离越小越可能是最优匹配
        # Final cost matrix
        C = self.cost_bbox * cost_bbox + self.cost_class * cost_class + self.cost_giou * cost_giou
#(100b, m)--> (b, 100, m)
        C = C.view(bs, num_queries, -1).cpu()

#计算每个batch内部有多少物体,后续计算时候按照单张图片进行匹配,没必要batch级别匹配,徒增计算
        sizes = [len(v["boxes"]) for v in targets]
#匈牙利最优匹配,返回匹配索引
#enumerate(C.split(sizes, -1))]:(b,100,image1,image2,image3,...)
        indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]   
        return [(torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)) for i, j in indices]

在得到匹配关系后算loss就水到渠成了。loss_labels计算分类损失,loss_boxes计算回归损失,包含 L_1 loss,iou_loss。