【人工智能学习之HDGCN18关键点修改】
- 训练部分
- 修改部分
训练部分
请参考文章:【人工智能学习之HDGCN训练自己的数据集】
修改部分
参考源码中25关键点的区域划分,我们将18关键点划分为:
头部:
- 鼻子
- 左眼和左耳
- 右眼和右耳
上肢:
- 左肩、左肘、左腕
- 右肩、右肘、右腕
下肢:
- 左髋、左膝、左踝
- 右髋、右膝、右踝
躯干:
- 颈部、左右肩、左右髋
对于【人工智能学习之HDGCN训练自己的数据集】中模型移植与修改部分,我的修改内容如下:
HDhierarchy.py:
from audioop import reverse
import sys
import numpy as npsys.path.extend(['../'])num_node = 18import numpy as npdef edge2mat(link, num_node):A = np.zeros((num_node, num_node))for i, j in link:A[j, i] = 1return Adef normalize_digraph(A):Dl = np.sum(A, 0)h, w = A.shapeDn = np.zeros((w, w))for i in range(w):if Dl[i] > 0:Dn[i, i] = Dl[i] ** (-1)AD = np.dot(A, Dn)return ADdef get_spatial_graph(num_node, hierarchy):A = []for i in range(len(hierarchy)):A.append(normalize_digraph(edge2mat(hierarchy[i], num_node)))A = np.stack(A)return Adef get_spatial_graph_original(num_node, self_link, inward, outward):I = edge2mat(self_link, num_node)In = normalize_digraph(edge2mat(inward, num_node))Out = normalize_digraph(edge2mat(outward, num_node))A = np.stack((I, In, Out))return Adef normalize_adjacency_matrix(A):node_degrees = A.sum(-1)degs_inv_sqrt = np.power(node_degrees, -0.5)norm_degs_matrix = np.eye(len(node_degrees)) * degs_inv_sqrtreturn (norm_degs_matrix @ A @ norm_degs_matrix).astype(np.float32)def get_graph(num_node, edges):I = edge2mat(edges[0], num_node)Forward = normalize_digraph(edge2mat(edges[1], num_node))Reverse = normalize_digraph(edge2mat(edges[2], num_node))A = np.stack((I, Forward, Reverse))return A # 3, 25, 25def get_hierarchical_graph(num_node, edges):A = []for edge in edges:A.append(get_graph(num_node, edge))A = np.stack(A)return Adef get_groups(dataset='NTU', CoM=18):groups = []if dataset == 'NTU':if CoM == 2:groups.append([2])groups.append([1, 21])groups.append([13, 17, 3, 5, 9])groups.append([14, 18, 4, 6, 10])groups.append([15, 19, 7, 11])groups.append([16, 20, 8, 12])groups.append([22, 23, 24, 25])## Center of mass : 21elif CoM == 21:groups.append([21])groups.append([2, 3, 5, 9])groups.append([4, 6, 10, 1])groups.append([7, 11, 13, 17])groups.append([8, 12, 14, 18])groups.append([22, 23, 24, 25, 15, 19])groups.append([16, 20])## Center of Mass : 1elif CoM == 1:groups.append([1])groups.append([2, 13, 17])groups.append([14, 18, 21])groups.append([3, 5, 9, 15, 19])groups.append([4, 6, 10, 16, 20])groups.append([7, 11])groups.append([8, 12, 22, 23, 24, 25])elif CoM == 18:# 头部groups.append([1]) # 鼻子groups.append([15, 17]) # 左眼和左耳groups.append([16, 18]) # 右眼和右耳# 上肢groups.append([3, 4, 5]) # 左肩、左肘、左腕groups.append([6, 7, 8]) # 右肩、右肘、右腕# 下肢groups.append([9, 10, 11]) # 左髋、左膝、左踝groups.append([12, 13, 14]) # 右髋、右膝、右踝# 躯干groups.append([2, 3, 6, 9, 12]) # 颈部、左右肩、左右髋else:raise ValueError()return groupsdef get_edgeset(dataset='NTU', CoM=18):groups = get_groups(dataset=dataset, CoM=CoM)for i, group in enumerate(groups):group = [i - 1 for i in group]groups[i] = groupidentity = []forward_hierarchy = []reverse_hierarchy = []for i in range(len(groups) - 1):self_link = groups[i] + groups[i + 1]self_link = [(i, i) for i in self_link]identity.append(self_link)forward_g = []for j in groups[i]:for k in groups[i + 1]:forward_g.append((j, k))forward_hierarchy.append(forward_g)reverse_g = []for j in groups[-1 - i]:for k in groups[-2 - i]:reverse_g.append((j, k))reverse_hierarchy.append(reverse_g)edges = []for i in range(len(groups) - 1):edges.append([identity[i], forward_hierarchy[i], reverse_hierarchy[-1 - i]])return edgesclass Graph:def __init__(self, CoM=18, labeling_mode='spatial'):self.num_node = num_nodeself.CoM = CoMself.A = self.get_adjacency_matrix(labeling_mode)def get_adjacency_matrix(self, labeling_mode=None):if labeling_mode is None:return self.Aif labeling_mode == 'spatial':A = get_hierarchical_graph(num_node, get_edgeset(dataset='NTU', CoM=self.CoM)) # L, 3, 25, 25else:raise ValueError()return A, self.CoM
以及网络的部分修改:
hd_gcn.py:
import torch
import torch.nn as nn
import mathimport numpy as npfrom einops import rearrange, repeatfrom net.HDhierarchy import get_groupsdef import_class(name):components = name.split('.')mod = __import__(components[0])for comp in components[1:]:mod = getattr(mod, comp)return moddef conv_branch_init(conv, branches):weight = conv.weightn = weight.size(0)k1 = weight.size(1)k2 = weight.size(2)nn.init.normal_(weight, 0, math.sqrt(2. / (n * k1 * k2 * branches)))if conv.bias is not None:nn.init.constant_(conv.bias, 0)def conv_init(conv):if conv.weight is not None:nn.init.kaiming_normal_(conv.weight, mode='fan_out')if conv.bias is not None:nn.init.constant_(conv.bias, 0)def bn_init(bn, scale):nn.init.constant_(bn.weight, scale)nn.init.constant_(bn.bias, 0)def weights_init(m):classname = m.__class__.__name__if classname.find('Conv') != -1:if hasattr(m, 'weight'):nn.init.kaiming_normal_(m.weight, mode='fan_out')if hasattr(m, 'bias') and m.bias is not None and isinstance(m.bias, torch.Tensor):nn.init.constant_(m.bias, 0)elif classname.find('BatchNorm') != -1:if hasattr(m, 'weight') and m.weight is not None:m.weight.data.normal_(1.0, 0.02)if hasattr(m, 'bias') and m.bias is not None:m.bias.data.fill_(0)class TemporalConv(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride=1, dilation=1):super(TemporalConv, self).__init__()pad = (kernel_size + (kernel_size - 1) * (dilation - 1) - 1) // 2self.conv = nn.Conv2d(in_channels,out_channels,kernel_size=(kernel_size, 1),padding=(pad, 0),stride=(stride, 1),dilation=(dilation, 1),bias=False)self.bias = nn.Parameter(torch.zeros(1, out_channels, 1, 1), requires_grad=True)self.bn = nn.BatchNorm2d(out_channels)def forward(self, x):x = self.conv(x) + self.biasx = self.bn(x)return xclass MultiScale_TemporalConv(nn.Module):def __init__(self,in_channels,out_channels,kernel_size=5,stride=1,dilations=[1,2],residual=True,residual_kernel_size=1):super().__init__()assert out_channels % (len(dilations) + 2) == 0, '# out channels should be multiples of # branches'# Multiple branches of temporal convolutionself.num_branches = len(dilations) + 2branch_channels = out_channels // self.num_branchesif type(kernel_size) == list:assert len(kernel_size) == len(dilations)else:kernel_size = [kernel_size] * len(dilations)# Temporal Convolution branchesself.branches = nn.ModuleList([nn.Sequential(nn.Conv2d(in_channels,branch_channels,kernel_size=1,padding=0),nn.BatchNorm2d(branch_channels),nn.ReLU(inplace=True),TemporalConv(branch_channels,branch_channels,kernel_size=ks,stride=stride,dilation=dilation),)for ks, dilation in zip(kernel_size, dilations)])# Additional Max & 1x1 branchself.branches.append(nn.Sequential(nn.Conv2d(in_channels, branch_channels, kernel_size=1, padding=0),nn.BatchNorm2d(branch_channels),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=(3, 1), stride=(stride, 1), padding=(1, 0)),nn.BatchNorm2d(branch_channels)))self.branches.append(nn.Sequential(nn.Conv2d(in_channels, branch_channels, kernel_size=1, padding=0, stride=(stride, 1)),nn.BatchNorm2d(branch_channels)))# Residual connectionif not residual:self.residual = lambda x: 0elif (in_channels == out_channels) and (stride == 1):self.residual = lambda x: xelse:self.residual = TemporalConv(in_channels, out_channels, kernel_size=residual_kernel_size, stride=stride)# initializeself.apply(weights_init)def forward(self, x):branch_outs = []for tempconv in self.branches:out = tempconv(x)branch_outs.append(out)out = torch.cat(branch_outs, dim=1)out += self.residual(x)return outclass residual_conv(nn.Module):def __init__(self, in_channels, out_channels, kernel_size=5, stride=1):super(residual_conv, self).__init__()pad = int((kernel_size - 1) / 2)self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=(kernel_size, 1), padding=(pad, 0),stride=(stride, 1))self.bn = nn.BatchNorm2d(out_channels)self.relu = nn.ReLU(inplace=True)conv_init(self.conv)bn_init(self.bn, 1)def forward(self, x):x = self.bn(self.conv(x))return xclass EdgeConv(nn.Module):def __init__(self, in_channels, out_channels, k):super(EdgeConv, self).__init__()self.k = kself.conv = nn.Sequential(nn.Conv2d(in_channels*2, out_channels, kernel_size=1, bias=False),nn.BatchNorm2d(out_channels),nn.LeakyReLU(inplace=True, negative_slope=0.2))for m in self.modules():if isinstance(m, nn.Conv2d):conv_init(m)elif isinstance(m, nn.BatchNorm2d):bn_init(m, 1)def forward(self, x, dim=4): # N, C, T, Vif dim == 3:N, C, L = x.size()passelse:N, C, T, V = x.size()x = x.mean(dim=-2, keepdim=False) # N, C, Vx = self.get_graph_feature(x, self.k)x = self.conv(x)x = x.max(dim=-1, keepdim=False)[0]if dim == 3:passelse:x = repeat(x, 'n c v -> n c t v', t=T)return xdef knn(self, x, k):inner = -2 * torch.matmul(x.transpose(2, 1), x) # N, V, Vxx = torch.sum(x**2, dim=1, keepdim=True)pairwise_distance = - xx - inner - xx.transpose(2, 1)idx = pairwise_distance.topk(k=k, dim=-1)[1] # N, V, kreturn idxdef get_graph_feature(self, x, k, idx=None):N, C, V = x.size()if idx is None:idx = self.knn(x, k=k)device = x.get_device()idx_base = torch.arange(0, N, device=device).view(-1, 1, 1) * Vidx = idx + idx_baseidx = idx.view(-1)x = rearrange(x, 'n c v -> n v c')feature = rearrange(x, 'n v c -> (n v) c')[idx, :]feature = feature.view(N, V, k, C)x = repeat(x, 'n v c -> n v k c', k=k)feature = torch.cat((feature - x, x), dim=3)feature = rearrange(feature, 'n v k c -> n c v k')return featureclass AHA(nn.Module):def __init__(self, in_channels, num_layers, CoM):super(AHA, self).__init__()self.num_layers = num_layersgroups = get_groups(dataset='NTU', CoM=CoM)for i, group in enumerate(groups):group = [i - 1 for i in group]groups[i] = groupinter_channels = in_channels // 4self.layers = [groups[i] + groups[i + 1] for i in range(len(groups) - 1)]self.conv_down = nn.Sequential(nn.Conv2d(in_channels, inter_channels, kernel_size=1),nn.BatchNorm2d(inter_channels),nn.ReLU(inplace=True))self.edge_conv = EdgeConv(inter_channels, inter_channels, k=3)self.aggregate = nn.Conv1d(inter_channels, in_channels, kernel_size=1)self.sigmoid = nn.Sigmoid()def forward(self, x):N, C, L, T, V = x.size()x_t = x.max(dim=-2, keepdim=False)[0]x_t = self.conv_down(x_t)x_sampled = []for i in range(self.num_layers):s_t = x_t[:, :, i, self.layers[i]]s_t = s_t.mean(dim=-1, keepdim=True)x_sampled.append(s_t)x_sampled = torch.cat(x_sampled, dim=2)att = self.edge_conv(x_sampled, dim=3)att = self.aggregate(att).view(N, C, L, 1, 1)out = (x * self.sigmoid(att)).sum(dim=2, keepdim=False)return outclass HD_Gconv(nn.Module):def __init__(self, in_channels, out_channels, A, adaptive=True, residual=True, att=False, CoM=18):super(HD_Gconv, self).__init__()self.num_layers = A.shape[0]self.num_subset = A.shape[1]self.att = attinter_channels = out_channels // (self.num_subset + 1)self.adaptive = adaptiveif adaptive:self.PA = nn.Parameter(torch.from_numpy(A.astype(np.float32)), requires_grad=True)else:raise ValueError()self.conv_down = nn.ModuleList()self.conv = nn.ModuleList()for i in range(self.num_layers):self.conv_d = nn.ModuleList()self.conv_down.append(nn.Sequential(nn.Conv2d(in_channels, inter_channels, kernel_size=1),nn.BatchNorm2d(inter_channels),nn.ReLU(inplace=True)))for j in range(self.num_subset):self.conv_d.append(nn.Sequential(nn.Conv2d(inter_channels, inter_channels, kernel_size=1),nn.BatchNorm2d(inter_channels)))self.conv_d.append(EdgeConv(inter_channels, inter_channels, k=5))self.conv.append(self.conv_d)if self.att:self.aha = AHA(out_channels, num_layers=self.num_layers, CoM=CoM)if residual:if in_channels != out_channels:self.down = nn.Sequential(nn.Conv2d(in_channels, out_channels, 1),nn.BatchNorm2d(out_channels))else:self.down = lambda x: xelse:self.down = lambda x: 0self.bn = nn.BatchNorm2d(out_channels)# 7개 conv layerself.relu = nn.ReLU(inplace=True)for m in self.modules():if isinstance(m, nn.Conv2d):conv_init(m)elif isinstance(m, nn.BatchNorm2d):bn_init(m, 1)bn_init(self.bn, 1e-6)def forward(self, x):A = self.PAout = []for i in range(self.num_layers):y = []x_down = self.conv_down[i](x)for j in range(self.num_subset):z = torch.einsum('n c t u, v u -> n c t v', x_down, A[i, j])z = self.conv[i][j](z)y.append(z)y_edge = self.conv[i][-1](x_down)y.append(y_edge)y = torch.cat(y, dim=1)out.append(y)out = torch.stack(out, dim=2)if self.att:out = self.aha(out)else:out = out.sum(dim=2, keepdim=False)out = self.bn(out)out += self.down(x)out = self.relu(out)return outclass TCN_GCN_unit(nn.Module):def __init__(self, in_channels, out_channels, A, stride=1, residual=True, adaptive=True,kernel_size=5, dilations=[1, 2], att=True, CoM=18):super(TCN_GCN_unit, self).__init__()self.gcn1 = HD_Gconv(in_channels, out_channels, A, adaptive=adaptive, att=att, CoM=CoM)self.tcn1 = MultiScale_TemporalConv(out_channels, out_channels, kernel_size=kernel_size, stride=stride, dilations=dilations,residual=False)self.relu = nn.ReLU(inplace=True)if not residual:self.residual = lambda x: 0elif (in_channels == out_channels) and (stride == 1):self.residual = lambda x: xelse:self.residual = residual_conv(in_channels, out_channels, kernel_size=1, stride=stride)def forward(self, x):y = self.relu(self.tcn1(self.gcn1(x)) + self.residual(x))return yclass Model(nn.Module):def __init__(self, num_class=2, num_point=18, num_person=1, graph=None, graph_args=dict(), in_channels=3,drop_out=0, adaptive=True):super(Model, self).__init__()if graph is None:raise ValueError()else:Graph = import_class(graph)self.graph = Graph(**graph_args)A, CoM = self.graph.Aself.dataset = 'NTU' if num_point == 18 else 'UCLA'self.num_class = num_classself.num_point = num_pointself.data_bn = nn.BatchNorm1d(num_person * in_channels * num_point)base_channels = 64self.l1 = TCN_GCN_unit(3, base_channels, A, residual=False, adaptive=adaptive, att=False, CoM=CoM)self.l2 = TCN_GCN_unit(base_channels, base_channels, A, adaptive=adaptive, CoM=CoM)self.l3 = TCN_GCN_unit(base_channels, base_channels, A, adaptive=adaptive, CoM=CoM)self.l4 = TCN_GCN_unit(base_channels, base_channels, A, adaptive=adaptive, CoM=CoM)self.l5 = TCN_GCN_unit(base_channels, base_channels*2, A, stride=2, adaptive=adaptive, CoM=CoM)self.l6 = TCN_GCN_unit(base_channels*2, base_channels*2, A, adaptive=adaptive, CoM=CoM)self.l7 = TCN_GCN_unit(base_channels*2, base_channels*2, A, adaptive=adaptive, CoM=CoM)self.l8 = TCN_GCN_unit(base_channels*2, base_channels*4, A, stride=2, adaptive=adaptive, CoM=CoM)self.l9 = TCN_GCN_unit(base_channels*4, base_channels*4, A, adaptive=adaptive, CoM=CoM)self.l10 = TCN_GCN_unit(base_channels*4, base_channels*4, A, adaptive=adaptive, CoM=CoM)self.fc = nn.Linear(base_channels*4, num_class)nn.init.normal_(self.fc.weight, 0, math.sqrt(2. / num_class))bn_init(self.data_bn, 1)if drop_out:self.drop_out = nn.Dropout(drop_out)else:self.drop_out = lambda x: xdef forward(self, x):N, C, T, V, M = x.size()x = rearrange(x, 'n c t v m -> n (m v c) t')x = self.data_bn(x)x = rearrange(x, 'n (m v c) t -> (n m) c t v', m=M, v=V)x = self.l1(x)x = self.l2(x)x = self.l3(x)x = self.l4(x)x = self.l5(x)x = self.l6(x)x = self.l7(x)x = self.l8(x)x = self.l9(x)x = self.l10(x)# N*M,C,T,Vc_new = x.size(1)x = x.view(N, M, c_new, -1)x = x.mean(3).mean(1)x = self.drop_out(x)return self.fc(x)
最后我们编写训练所需的yaml配置文件:
work_dir: ./work_dir/recognition/kinetics_skeleton/HD_GCN# feeder
feeder: feeder.feeder.Feeder
train_feeder_args:random_choose: Truerandom_move: Truewindow_size: 30data_path: C:/WorkFiles/company_server_SSH/st-gcn-master/dataset/HDdataset/train_data.npylabel_path: C:/WorkFiles/company_server_SSH/st-gcn-master/dataset/HDdataset/train_label.pkl
test_feeder_args:data_path: C:/WorkFiles/company_server_SSH/st-gcn-master/dataset/HDdataset/val_data.npylabel_path: C:/WorkFiles/company_server_SSH/st-gcn-master/dataset/HDdataset/val_label.pkl# model
model: net.hd_gcn.Model
model_args:in_channels: 3num_class: 2num_person: 1graph: net.HDhierarchy.Graphgraph_args:labeling_mode: 'spatial'CoM: 18# training
device: [0]
batch_size: 64
test_batch_size: 64#optim
base_lr: 0.01
step: [20, 40, 60, 80]
num_epoch: 100
另外有一点需要注意,如果训练数据集出现的人数是多人的话,需要修改相应的num_person,否则data_bn会报一个batch_normal维度不匹配的错误。