反向传播算法:loss.backward()的实现细节
- 向前传播:输入数据得到预测结果。
- 向后传播:计算梯度加更新参数。
- 反向传播:计算梯度
计算图
计算图 = 有向无环图 + 基本运算
- 节点:变量节点 & 计算节点
- 有向边:表示计算关系
- 基本运算:比如四则运算、sigmid等
import torch
import torch.nn as nn
from graphviz import Digraph
定义类表示计算图
初始化函数参数: value表示值,prevs表示直接的前序节点, op表示计算符号(针对计算节点),label表示变量名。
定义类的输出字符串。
定义 ‘+’ 与 ‘*’ 触发函数(包括计算当前节点局部梯度)。
# 计算图
class ScalarTmp:def __init__(self, value, prevs=[], op=None, label=''):# value表示值,prevs表示直接的前序节点, op表示计算符号(针对计算节点),label表示变量名self.value = valueself.prevs = prevsself.op = opself.label = labelself.grad = 0.0 # 全局梯度self.grad_wrt = {} # 局部梯度# 定义类的输出字符串def __repr__(self):return f'{self.value} | {self.op} | {self.label}'def __add__(self, other):# self + other触发函数value = self.value + other.valueprevs = [self, other]output = ScalarTmp(value, prevs, op='+')output.grad_wrt[self] = 1output.grad_wrt[other] = 1return outputdef __mul__(self, other):# self * other触发函数value = self.value * other.valueprevs = [self, other]output = ScalarTmp(value, prevs, op='*')output.grad_wrt[self] = other.valueoutput.grad_wrt[other] = self.valuereturn output
画图
def _trace(root):# 遍历所有点和边nodes, edges = set(), set() # set集合存储唯一的元素,并且没有特定的顺序def _build(v):if v not in nodes:nodes.add(v)for prev in v.prevs:edges.add((prev, v))_build(prev)_build(root)return nodes, edgesdef draw_graph(root, direction='forward'):nodes, edges = _trace(root)rankdir = 'BT' if direction == 'forward' else 'TB'graph = Digraph(format='svg', graph_attr={'rankdir': rankdir})# 可缩放矢量图形(SVG),这是一种用于网页的矢量图形格式。# rankdir 属性定义了图的布局方向# 'TB': 从上到下布局。# 'LR': 从左到右布局。# 'BT': 从下到上布局。# 'RL': 从右到左布局。# 画点for node in nodes:label = node.label if node.op is None else node.opnode_attr = f'{{grad={node.grad:.2f} | value={node.value:.2f} | {label}}}'uid = str(id(node))graph.node(name=uid, label=node_attr, shape='record')# 画边for edge in edges:id1 = str(id(edge[0]))id2 = str(id(edge[1]))graph.edge(id1, id2)return graph
利用深度优先搜索排序
ordered 用于存储排序后的节点序列,visited 是一个集合,用于记录已经访问过的节点,以避免重复访问。
# 拓扑排序
def _top_order(root):# 利用深度优先搜索排序# ordered 用于存储排序后的节点序列,visited 是一个集合,用于记录已经访问过的节点,以避免重复访问。ordered, visited = [], set()def _add_prevs(node):if node not in visited:visited.add(node)for prev in node.prevs:_add_prevs(prev)ordered.append(node) # 将当前节点添加到 ordered 列表的末尾_add_prevs(root)return ordered
计算梯度
链式法则:v.grad += node.grad * node.grad_wrt[v]
# 计算梯度
def backward(root):# 定义顶点的梯度为1root.grad = 1.0ordered = _top_order(root)for node in reversed(ordered): # reversed反转for v in node.prevs:v.grad += node.grad * node.grad_wrt[v]return root
绘制简单计算图
a = ScalarTmp(value=1.0, label='a')
b = ScalarTmp(value=2.0, label='b')
c = ScalarTmp(value=4.0, label='c')
d = a + b
e = a * c
f = d * ebackward(f)
draw_graph(f)
完善计算图代码保存到utils.py文件中
源码如下:
from graphviz import Digraph
import mathclass Scalar:def __init__(self, value, prevs=[], op=None, label='', requires_grad=True):# 节点的值self.value = value# 节点的标识(label)和对应的运算(op),用于作图self.label = labelself.op = op# 节点的前节点,即当前节点是运算的结果,而前节点是参与运算的量self.prevs = prevs# 是否需要计算该节点偏导数,即∂loss/∂self(loss表示最后的模型损失)self.requires_grad = requires_grad# 该节点偏导数,即∂loss/∂selfself.grad = 0.0# 如果该节点的prevs非空,存储所有的∂self/∂prevself.grad_wrt = dict()# 作图需要,实际上对计算没有作用self.back_prop = dict()def __repr__(self):return f'Scalar(value={self.value:.2f}, grad={self.grad:.2f})'def __add__(self, other):'''定义加法,self + other将触发该函数'''if not isinstance(other, Scalar):other = Scalar(other, requires_grad=False)# output = self + otheroutput = Scalar(self.value + other.value, [self, other], '+')output.requires_grad = self.requires_grad or other.requires_grad# 计算偏导数 ∂output/∂self = 1output.grad_wrt[self] = 1# 计算偏导数 ∂output/∂other = 1output.grad_wrt[other] = 1return outputdef __sub__(self, other):'''定义减法,self - other将触发该函数'''if not isinstance(other, Scalar):other = Scalar(other, requires_grad=False)# output = self - otheroutput = Scalar(self.value - other.value, [self, other], '-')output.requires_grad = self.requires_grad or other.requires_grad# 计算偏导数 ∂output/∂self = 1output.grad_wrt[self] = 1# 计算偏导数 ∂output/∂other = -1output.grad_wrt[other] = -1return outputdef __mul__(self, other):'''定义乘法,self * other将触发该函数'''if not isinstance(other, Scalar):other = Scalar(other, requires_grad=False)# output = self * otheroutput = Scalar(self.value * other.value, [self, other], '*')output.requires_grad = self.requires_grad or other.requires_grad# 计算偏导数 ∂output/∂self = otheroutput.grad_wrt[self] = other.value# 计算偏导数 ∂output/∂other = selfoutput.grad_wrt[other] = self.valuereturn outputdef __pow__(self, other):'''定义乘方,self**other将触发该函数'''assert isinstance(other, (int, float))# output = self ** otheroutput = Scalar(self.value ** other, [self], f'^{other}')output.requires_grad = self.requires_grad# 计算偏导数 ∂output/∂self = other * self**(other-1)output.grad_wrt[self] = other * self.value ** (other - 1)return outputdef sigmoid(self):'''定义sigmoid'''s = 1 / (1 + math.exp(-1 * self.value))output = Scalar(s, [self], 'sigmoid')output.requires_grad = self.requires_grad# 计算偏导数 ∂output/∂self = output * (1 - output)output.grad_wrt[self] = s * (1 - s)return outputdef __rsub__(self, other):'''定义右减法,other - self将触发该函数'''if not isinstance(other, Scalar):other = Scalar(other, requires_grad=False)output = Scalar(other.value - self.value, [self, other], '-')output.requires_grad = self.requires_grad or other.requires_grad# 计算偏导数 ∂output/∂self = -1output.grad_wrt[self] = -1# 计算偏导数 ∂output/∂other = 1output.grad_wrt[other] = 1return outputdef __radd__(self, other):'''定义右加法,other + self将触发该函数'''return self.__add__(other)def __rmul__(self, other):'''定义右乘法,other * self将触发该函数'''return self * otherdef backward(self, fn=None):'''由当前节点出发,求解以当前节点为顶点的计算图中每个节点的偏导数,i.e. ∂self/∂node参数----fn :画图函数,如果该变量不等于None,则会返回向后传播每一步的计算的记录返回----re :向后传播每一步的计算的记录'''def _topological_order():'''利用深度优先算法,返回计算图的拓扑排序(topological sorting)'''def _add_prevs(node):if node not in visited:visited.add(node)for prev in node.prevs:_add_prevs(prev)ordered.append(node)ordered, visited = [], set()_add_prevs(self)return ordereddef _compute_grad_of_prevs(node):'''由node节点出发,向后传播'''# 作图需要,实际上对计算没有作用node.back_prop = dict()# 得到当前节点在计算图中的梯度。由于一个节点可以在多个计算图中出现,# 使用cg_grad记录当前计算图的梯度dnode = cg_grad[node]# 使用node.grad记录节点的累积梯度node.grad += dnodefor prev in node.prevs:# 由于node节点的偏导数已经计算完成,可以向后扩散(反向传播)# 需要注意的是,向后扩散到上游节点是累加关系grad_spread = dnode * node.grad_wrt[prev]cg_grad[prev] = cg_grad.get(prev, 0.0) + grad_spreadnode.back_prop[prev] = node.back_prop.get(prev, 0.0) + grad_spread# 当前节点的偏导数等于1,因为∂self/∂self = 1。这是反向传播算法的起点cg_grad = {self: 1}# 为了计算每个节点的偏导数,需要使用拓扑排序的倒序来遍历计算图ordered = reversed(_topological_order())re = []for node in ordered:_compute_grad_of_prevs(node)# 作图需要,实际上对计算没有作用if fn is not None:re.append(fn(self, 'backward'))return redef _get_node_attr(node, direction='forward'):'''节点的属性'''node_type = _get_node_type(node)# 设置字体res = {'fontname': 'Menlo'}def _forward_attr():if node_type == 'param':node_text = f'{{ grad=None | value={node.value:.2f} | {node.label}}}'res.update(dict(label=node_text, shape='record', fontsize='10', fillcolor='lightgreen', style='filled, bold'))return reselif node_type == 'computation':node_text = f'{{ grad=None | value={node.value:.2f} | {node.op}}}'res.update(dict(label=node_text, shape='record', fontsize='10', fillcolor='gray94', style='filled, rounded'))return reselif node_type == 'input':if node.label == '':node_text = f'input={node.value:.2f}'else:node_text = f'{node.label}={node.value:.2f}'res.update(dict(label=node_text, shape='oval', fontsize='10'))return resdef _backward_attr():attr = _forward_attr()attr['label'] = attr['label'].replace('grad=None', f'grad={node.grad:.2f}')if not node.requires_grad:attr['style'] = 'dashed'# 为了作图美观# 如果向后扩散(反向传播)的梯度等于0,或者扩散给不需要梯度的节点,那么该节点用虚线表示grad_back = [v if k.requires_grad else 0 for (k, v) in node.back_prop.items()]if len(grad_back) > 0 and sum(grad_back) == 0:attr['style'] = 'dashed'return attrif direction == 'forward':return _forward_attr()else:return _backward_attr()def _get_node_type(node):'''决定节点的类型,计算节点、参数以及输入数据'''if node.op is not None:return 'computation'if node.requires_grad:return 'param'return 'input'def _trace(root):'''遍历图中的所有点和边'''nodes, edges = set(), set()def _build(v):if v not in nodes:nodes.add(v)for prev in v.prevs:edges.add((prev, v))_build(prev)_build(root)return nodes, edgesdef _draw_node(graph, node, direction='forward'):'''画节点'''node_attr = _get_node_attr(node, direction)uid = str(id(node)) + directiongraph.node(name=uid, **node_attr)def _draw_edge(graph, n1, n2, direction='forward'):'''画边'''uid1 = str(id(n1)) + directionuid2 = str(id(n2)) + directiondef _draw_back_edge():if n1.requires_grad and n2.requires_grad:grad = n2.back_prop.get(n1, None)if grad is None:graph.edge(uid2, uid1, arrowhead='none', color='deepskyblue')elif grad == 0:graph.edge(uid2, uid1, style='dashed', label=f'{grad:.2f}', color='deepskyblue', fontname='Menlo')else:graph.edge(uid2, uid1, label=f'{grad:.2f}', color='deepskyblue', fontname='Menlo')else:graph.edge(uid2, uid1, style='dashed', arrowhead='none', color='deepskyblue')if direction == 'forward':graph.edge(uid1, uid2)elif direction == 'backward':_draw_back_edge()else:_draw_back_edge()graph.edge(uid1, uid2)def draw_graph(root, direction='forward'):'''图形化展示由root为顶点的计算图参数----root :Scalar,计算图的顶点direction :str,向前传播(forward)或者反向传播(backward)返回----re :Digraph,计算图'''nodes, edges = _trace(root)rankdir = 'BT' if direction == 'forward' else 'TB'graph = Digraph(format='svg', graph_attr={'rankdir': rankdir})for item in nodes:_draw_node(graph, item, direction)for n1, n2 in edges:_draw_edge(graph, n1, n2, direction)return graph
引用utils文件绘制计算图
from utils import Scalar, draw_graph
from IPython.display import clear_output
import timea = Scalar(value=1.0, label='a')
b = Scalar(value=2.0, label='b')
c = Scalar(value=4.0, label='c')
d = a + b
e = a * c
f = d * e
backward_process = f.backward(draw_graph)
draw_graph(f, 'backward')
检查每一步梯度计算
for i in backward_process:display(i)time.sleep(3)clear_output(wait=True)
梯度累积
在linear_model.py文件中定义线性模型Linear和计算均方误差函数mse
from utils import Scalardef mse(errors):'''计算均方误差'''n = len(errors)wrt = {}value = 0.0requires_grad = Falsefor item in errors:value += item.value ** 2 / nwrt[item] = 2 / n * item.valuerequires_grad = requires_grad or item.requires_gradoutput = Scalar(value, errors, 'mse')output.requires_grad=requires_gradoutput.grad_wrt = wrtreturn outputclass Linear:def __init__(self):'''定义线性回归模型的参数:a, b'''self.a = Scalar(0.0, label='a')self.b = Scalar(0.0, label='b')def forward(self, x):'''根据当前的参数估计值,得到模型的预测结果'''return self.a * x + self.bdef error(self, x, y):'''当前数据的模型误差'''return y - self.forward(x)def string(self):'''输出当前模型的结果'''return f'y = {self.a.value:.2f} * x + {self.b.value:.2f}'
引用utils.py和linear.py文件搭建线性模型并讨论拟合情况
from utils import Scalar, draw_graph
from linear_model import Linear, mse
# 生成数据
import torchtorch.manual_seed(1024) # 随机种子x = torch.linspace(100, 300, 200)
x = (x - torch.mean(x)) / torch.std(x) # 将数据的分布转换为均值为0,标准差为1的标准正态分布
epsilon = torch.randn(x.shape)y = 10 * x + 5 + epsilon
model = Linear()batch_size = 20
learning_rate = 0.1for t in range(20):ix = (t * batch_size) % len(x)xx = x[ix : ix + batch_size]yy = y[ix : ix + batch_size]loss = mse([model.error(_x, _y) for _x, _y in zip(xx, yy)])loss.backward()model.a -= learning_rate * model.a.gradmodel.b -= learning_rate * model.b.gradmodel.a.grad = 0.0model.b.grad = 0.0print(model.string())
参数估计全流程
计算图膨胀
绘制两个数据点时的计算图
# 计算图膨胀
model = Linear()
# 定义两组数据
x1 = Scalar(1.5, label='x1', requires_grad=False)
y1 = Scalar(1.0, label='y1', requires_grad=False)
x2 = Scalar(2.0, label='x2', requires_grad=False)
y2 = Scalar(4.0, label='y2', requires_grad=False)
loss = mse([model.error(x1, y1), model.error(x2, y2)])
loss.backward()
draw_graph(loss, 'backward')
如图所示当有n个数据点时计算图会包含n个重复的部分(比较胖),这样的现象叫计算图膨胀,后果是内存使用量增大。
梯度累积
由于每个数据点在反向传播的时候,不依赖于其他数据点的反向传播,启发我们可以用多次累加的方式达到同样的效果,避免计算图膨胀。
- 第一次传播
因为模型的损失是平均数,当使用n个点时,需要将n个数据点的损失相加再除以n,这里就是乘以1/2进行系数调整。
# 第一次传播
model = Linear()
loss = 0.5 * mse([model.error(x1, y1)])
loss.backward()
draw_graph(loss, 'backward')
结果如下图所示,与上图右半部分是一样的。
- 第二次传播
第一次传播结束后不能立即进行梯度清零,需要等第二次传播才能得到真正想要的梯度,然后再更新参数。因此在现有基础上再进行一次传播。
# 第二次传播(梯度积累)
loss = 0.5 * mse([model.error(x2, y2)])
loss.backward()
draw_graph(loss, 'backward')
结果如下图所示,这里的梯度是在第一次传播的基础上累加得到的。
全流程
- 新增梯度积累次数,即梯度积累gradient_accu_iter次后再去更新模型参数
- 定义小批量数据量micro_size = batch_size / gradient_accu_iter,取数据时使用micro_size
- 循环次数增加,因为gradient_accu_iter次才更新一次,所以循环次数为20 * gradient_accu_iter
- 权重调整,损失乘以1 / gradient_accu_iter
model = Linear()batch_size = 20
learning_rate = 0.1
# 梯度积累次数
gradient_accu_iter = 4
# 小批量数据量
micro_size = int(batch_size / gradient_accu_iter)for t in range(20 * gradient_accu_iter):ix = (t * batch_size) % len(x)xx = x[ix : ix + micro_size]yy = y[ix : ix + micro_size]loss = mse([model.error(_x, _y) for _x, _y in zip(xx, yy)])# 权重调整loss *= 1 / gradient_accu_iterloss.backward()if (t + 1) % gradient_accu_iter == 0:model.a -= learning_rate * model.a.gradmodel.b -= learning_rate * model.b.gradmodel.a.grad = 0.0model.b.grad = 0.0print(model.string())