前面学习了Dijkstra以及A* 算法的基本原理,对于这两种算法而言,我们都能在有解的情况下找到一条沿着起点到达终点的路径。然而,这两个算法本身都是基于静态地图的,也就是说:当机器人找到路径后开始沿着起点向终点运动的过程中,默认地图是不会发生变化的,但是实际上大部分时候我们知道地图是动态的,随时存在一个突然出现的障碍物挡在原有的路径上,那么这时候原有的路径就失效了。一种简单粗暴的方式当然是从当前点开始重新执行一次Dijkstra或者A* 搜索,但是这样效率会很低,例如障碍物只挡住了当前位置一点点,而因此执行一次全局搜索显然是不太合适的,那么有什么方式能够快速且有效的找到一条合适的路径吗?答案就是:D*
基本逻辑
D* 算法的出现,本身就是为了解决动态路径规划的。它的基本思路如下:
1、初始时,使用Dijkstra算法寻找到一条从起点到达终点的路径。注意一点的是在传统的Dijkstra算法中,只维护了一个代价值f[s],但是D* 中对于每个点维护了两个不同的代价值k[s]与h[s]。此外,注意D*的算法是从终点往起点搜索的,这点比较重要要注意一下,因为涉及到后面沿着parent的搜索。2、机器人沿着找到的路径前进,假设在某个位置突然出现了一个障碍物。3、算法将该点处的h值更新为无穷大(不可达),同时调用process_state处理状态,直到算法找到新的路径或者找完了发现没有路径跳出
我们一条条往下看,首先看第一条,初始搜索。这个基本原理与Dijkstra基本上是一致的,区别在于D* 为了维护一个准确的到达目标点的代价值所以它的搜索似乎是从终点往起点搜索的。此外,关于k[s]与h[s]的定义,原论文中是这么定义的:
For each state X on the OPEN list (i.e., r ( X ) = OPEN ) , the key function,k(G, X ) , is defined to be equal to the minimum of h(G, X )
before modification and all values assumed by h(G, X ) since X was placed on the OPEN list.
即每个点,其存在两个值k[s]与h[s],而k[s]则等于搜索过程中该点的最小代价值,即h[s]中遇到的最小的那个。这个有什么用呢?后面会讲到,让我们继续往下看。
在找到一条路径后,机器人开始沿着这条路径前进,直到触发障碍物,此时会讲该点的h值进行修改。修改规则在论文中是这么定义的:
这其中使用到了一个函数insert。这个函数在论文中没有细讲,翻看了一些其他人的文章中,统一的逻辑是这么处理的:
def insert(self, s, h_new):"""insert node into OPEN set.:param s: node:param h_new: new or better cost to come value"""if self.t[s] == 'NEW':self.k[s] = h_newelif self.t[s] == 'OPEN':self.k[s] = min(self.k[s], h_new)elif self.t[s] == 'CLOSED':self.k[s] = min(self.h[s], h_new)self.h[s] = h_newself.t[s] = 'OPEN'self.OPEN.add(s)
对于路径上是障碍物的点,它的h(x)为inf即无穷,所以这里算法只改变该点的h值而不会改变其k值,同时这个点被加入到open list内,然后算法会调用process_state函数进行处理,对于process_state,原论文中是这么定义的:
这里就是D* 算法的精髓所在了。当算法进入这个函数时,首先获取OPEN list中k值最小的点,并将该点从OPEN list中删除:
kold = GET - KMIN( ) ; DELETE(X)
而h(x)的计算则是与Dijkstra算法或者A*算法中的定义一致,代表了当前点到目标点的代价值。当一个点被识别为障碍物时,它的h值会变成inf,因此该点的h值肯定是大于K值的,这种状态在论文中被定义为RAISE的状态,即k(G,X)<h(G,X),另外如果k(G,X)=h(G、X)则为LOWER的状态。论文中提到:
If X is a RAISE state, its path cost may not be optimal.
D* 使用OPEN列表上的RAISE状态来传播有关路径成本增加的信息,使用LOWER状态来传播关于路径成本减少的信息。即,如果X是RAISE状态,则其路径成本可能不是最优的。这也比较好理解,本来第一次规划完的时候k(s)=h(s),而遇到障碍物后h(s)增加,即该点前往目标点的代价值因为障碍物而增加了,那么此时这个点的路径大部分情况下就不是最优的了,都要撞了嘛。
而同时,我们也会根据这个状态在该点及附近开始搜索出一条新的路径,搜索的方式就是按照上述的process_state来循环更新open list中每个点及其附近点的信息,直到满足:
if k_min >= self.h[s]:
这里k_min是指open list中最小的k值,s则是之前遇到障碍物的点,这里之前在执行modify的过程中它的值边变成了inf,但是后面在process_state中其值会被改变,同时改变的还有它周围的一些点,这里的含义也就是说算法从遇到障碍物的点开始带方向的搜索,其搜索方向就是偏向于目标点的,因为相对而言这些点的k值更小,更容易被进行搜索,这样就避免了一些无意义的搜索。
示例
我们通过一个简单的例子看一下这个问题:
假设当前存在如上一张地图,起点坐标为(5,5),终点为(45,25),算法初始时使用Dijkstra算法寻找到了一条最优路径,这条路径经过的点为:
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (9, 10), (9, 11), (9, 12), (9, 13), (9, 14), (9, 15), (9, 16), (10, 16), (11, 16), (12, 16), (13, 16), (14, 16), (15, 16), (16, 16), (17, 16), (18, 16), (19, 16), (20, 16), (21, 16), (22, 15), (23, 14), (24, 14), (25, 14), (26, 14), (27, 14), (28, 14), (29, 14), (30, 14), (31, 14), (32, 15), (33, 16), (34, 17), (35, 18), (36, 19), (37, 20), (38, 21), (39, 22), (40, 23), (41, 24), (42, 25), (43, 25), (44, 25), (45, 25)]
然后我们在(9,10)这个位置插入一个障碍物。此时机器人从起点开始向终点前进。当机器人到达(9,9)时会发现下一个点是障碍物:
图中右上角代表当前的k值,刚开始时k值与h值是一样的,所以这里没有列出h值,箭头方向代表点的父系方向。
此时算法会调用modify函数修改(9,9)的h值,因为下个点是障碍物,所以(9,9)的h值会变成inf。
然后算法再调用process_state来处理这个点。首先,算法会进入第一个IF语句:
从当前点附近寻找能使当前点代价值更小的,当然由于这个点目前是最优路径上的点,所以其周围的点的k值都应该是比它要大的。所以这里的if语句目前是不会进入的。算法遍历周围的8个点但是没有进行点的参数改变。然后算法进入第二个IF语句:
前面我们知道在修改点位信息的时候k值是没有改变的,所以这里会进ELSE部分,对当前点周围点再次进行遍历,但是这里的判断条件就不一样了。这里会根据两个条件判断:该点的父系是否是(9,9)以及它的值是否会跟之前不一致。例如对于点(8,8),当前它的父系是(9,9),所以它会进第二个IF语句,但是对于(8,9),它的父系不是(9,9),所以它会进第二个else。
这些判断会改变当前点周围8个邻近点的状态,初始时,这些点的状态为:
(8,10) h: 47.798989873223334 parent: (9, 11) k: 47.798989873223334
(9,10) h: 47.38477631085024 parent: (9, 11) k: 47.38477631085024
(10,10) h: 47.798989873223334 parent: (9, 11) k: 47.798989873223334
(8,9) h: 48.798989873223334 parent: (9, 10) k: 48.798989873223334
(9,9) h: 48.38477631085024 parent: (9, 10) k: 48.38477631085024
(10,9) h: 48.798989873223334 parent: (9, 10) k: 48.798989873223334
(8,8) h: 49.798989873223334 parent: (9, 9) k: 49.798989873223334
(9,8) h: 49.38477631085024 parent: (9, 9) k: 49.38477631085024
(10,8) h: 49.798989873223334 parent: (9, 9) k: 49.798989873223334
经过一次遍历后变为:
(8,10) h: 47.798989873223334 parent: (9, 11) k: 47.798989873223334
(9,10) h: 47.38477631085024 parent: (9, 11) k: 47.38477631085024
(10,10) h: 47.798989873223334 parent: (9, 11) k: 47.798989873223334
(8,9) h: 48.798989873223334 parent: (9, 10) k: 48.798989873223334
(9,9) h: inf parent: (9, 10) k: 48.38477631085024
(10,9) h: 48.798989873223334 parent: (9, 10) k: 48.798989873223334
(8,8) h: inf parent: (9, 9) k: 49.798989873223334
(9,8) h: inf parent: (9, 9) k: 49.38477631085024
(10,8) h: inf parent: (9, 9) k: 49.798989873223334
图中浅蓝色的代表当前遍历时被改变的点,可以看到它们的h值目前都暂时变成了inf。
同时刚才遍历的一部分点被插入到了OPEN list内,例如(8,9),而(8,10),(9,10),(10,10)由于不满足条件其不会被插入OPEN list中,因此现在k值最小的就变成了(8,9)。
同样的,遍历(8,9)周围的点,找到并修改它的值,这个时候,由于一些点的h值的变化,导致了它们的父系会发生变化,例如(9,9)的父系会从(9,10)变成(8,9),因为(9,10)是障碍物,它们之间的代价值是无穷大,显然走(8,9)代价值会小很多,因此(9,9)的下一步被改为了(8,9)。
然后继续循环,算法会依次遍历:(9,9)、(8,9):
(10, 9):
(9, 8):
(8, 9):
(8, 10):
等一系列顺序,最后修改后的结果为:
(9,9)的父系变成(8,9)
(8,9)的父系变为(8,10)
(8,10)的父系变为(8,11)
新路径为:
而(8,11)的父系为(9,12),也就绕过了障碍物,至此,一条新的路径就被规划出来了:
附上代码:
import os
import sys
import math
import matplotlib.pyplot as pltclass DStar:def __init__(self, s_start, s_goal, xI, xG):self.s_start, self.s_goal = s_start, s_goalself.x_range = 51 # size of backgroundself.y_range = 31self.motions = [(-1, 0), (-1, 1), (0, 1), (1, 1),(1, 0), (1, -1), (0, -1), (-1, -1)] # feasible input setself.obs = self.obs_map()self.xI, self.xG = xI, xGself.u_set = self.motionsself.obs = self.obsself.x = self.x_rangeself.y = self.y_rangeself.fig = plt.figure()self.flag = Trueself.OPEN = set()self.t = dict()self.PARENT = dict()self.h = dict()self.k = dict()self.path = []self.visited = set()self.count = 0def init(self):for i in range(self.x_range):for j in range(self.y_range):self.t[(i, j)] = 'NEW'self.k[(i, j)] = 0.0self.h[(i, j)] = float("inf")self.PARENT[(i, j)] = Noneself.h[self.s_goal] = 0.0def update_obs(self, obs):self.obs = obsdef animation(self, path, visited, name):self.plot_grid(name)self.plot_visited(visited)self.plot_path(path)plt.show()def plot_grid(self, name):obs_x = [x[0] for x in self.obs]obs_y = [x[1] for x in self.obs]plt.plot(self.xI[0], self.xI[1], "bs")plt.plot(self.xG[0], self.xG[1], "gs")plt.plot(obs_x, obs_y, "sk")plt.title(name)plt.axis("equal")def plot_visited(self, visited, cl='gray'):if self.xI in visited:visited.remove(self.xI)if self.xG in visited:visited.remove(self.xG)count = 0for x in visited:count += 1plt.plot(x[0], x[1], color=cl, marker='o')plt.gcf().canvas.mpl_connect('key_release_event',lambda event: [exit(0) if event.key == 'escape' else None])if count < len(visited) / 3:length = 20elif count < len(visited) * 2 / 3:length = 30else:length = 40## length = 15if count % length == 0:plt.pause(0.001)plt.pause(0.01)def plot_path(self, path, cl='r', flag=False):path_x = [path[i][0] for i in range(len(path))]path_y = [path[i][1] for i in range(len(path))]if not flag:plt.plot(path_x, path_y, linewidth='3', color='r')else:plt.plot(path_x, path_y, linewidth='3', color=cl)plt.plot(self.xI[0], self.xI[1], "bs")plt.plot(self.xG[0], self.xG[1], "gs")plt.pause(0.01)def obs_map(self):"""Initialize obstacles' positions:return: map of obstacles"""x = self.x_rangey = self.y_rangeobs = set()for i in range(x):obs.add((i, 0))for i in range(x):obs.add((i, y - 1))for i in range(y):obs.add((0, i))for i in range(y):obs.add((x - 1, i))for i in range(10, 21):obs.add((i, 15))for i in range(15):obs.add((20, i))for i in range(15, 30):obs.add((30, i))for i in range(16):obs.add((40, i))return obsdef run(self, s_start, s_end):self.init()self.insert(s_end, 0)while True:self.process_state()if self.t[s_start] == 'CLOSED':breakself.path = self.extract_path(s_start, s_end)print(self.path)self.plot_grid("Dynamic A* (D*)")self.plot_path(self.path)self.fig.canvas.mpl_connect('button_press_event', self.on_press)plt.show()def on_press(self, event):x, y = event.xdata, event.ydataif x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:print("Please choose right area!")else:x, y = int(x), int(y)if (x, y) not in self.obs:print("Add obstacle at: s =", x, ",", "y =", y)self.obs.add((x, y))self.update_obs(self.obs)s = self.s_startself.visited = set()self.count += 1self.flag = Trueself.numb = 0while s != self.s_goal:#print("s is in:",s)self.numb += 1if self.numb > 1000:print("no route")breakif self.is_collision(s, self.PARENT[s]):if self.flag == True:self.flag = Falseself.s_start = sself.modify(s)continues = self.PARENT[s]self.path = self.extract_path(self.s_start, self.s_goal)print(self.path)plt.cla()self.plot_grid("Dynamic A* (D*)")self.plot_visited(self.visited)self.plot_path(self.path)self.fig.canvas.draw_idle()def extract_path(self, s_start, s_end):path = [s_start]s = s_startwhile True:s = self.PARENT[s]path.append(s)if s == s_end:return pathdef process_state(self):s = self.min_state() # get node in OPEN set with min k valueself.visited.add(s)if s is None:return -1 # OPEN set is emptyk_old = self.get_k_min() # record the min k value of this iteration (min path cost)self.delete(s) # move state s from OPEN set to CLOSED setself.t[s] = 'CLOSED'# k_min < h[s] --> s: RAISE state (increased cost)if k_old < self.h[s]:for s_n in self.get_neighbor(s):if self.h[s_n] <= k_old and \self.h[s] > self.h[s_n] + self.cost(s_n, s):# update h_value and choose parentself.PARENT[s] = s_nself.h[s] = self.h[s_n] + self.cost(s_n, s)# s: k_min >= h[s] -- > s: LOWER state (cost reductions)if k_old == self.h[s]:for s_n in self.get_neighbor(s):if self.t[s_n] == 'NEW' or \(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):# Condition:# 1) t[s_n] == 'NEW': not visited# 2) s_n's parent: cost reduction# 3) s_n find a better parentself.PARENT[s_n] = sself.insert(s_n, self.h[s] + self.cost(s, s_n))else:for s_n in self.get_neighbor(s):if self.t[s_n] == 'NEW' or \(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):# Condition:# 1) t[s_n] == 'NEW': not visited# 2) s_n's parent: cost reductionself.PARENT[s_n] = sself.insert(s_n, self.h[s] + self.cost(s, s_n))else:if self.PARENT[s_n] != s and \self.h[s_n] > self.h[s] + self.cost(s, s_n):# Condition: LOWER happened in OPEN set (s), s should be explored againself.k[s] = self.h[s]self.insert(s, self.h[s])else:if self.PARENT[s_n] != s and \self.h[s] > self.h[s_n] + self.cost(s_n, s) and \self.t[s_n] == 'CLOSED' and \self.h[s_n] > k_old:# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored againself.insert(s_n, self.h[s_n])return self.get_k_min()def min_state(self):"""choose the node with the minimum k value in OPEN set.:return: state"""if not self.OPEN:return Nonereturn min(self.OPEN, key=lambda x: self.k[x])def get_k_min(self):"""calc the min k value for nodes in OPEN set.:return: k value"""if not self.OPEN:return -1return min([self.k[x] for x in self.OPEN])def insert(self, s, h_new):"""insert node into OPEN set.:param s: node:param h_new: new or better cost to come value"""if self.t[s] == 'NEW':self.k[s] = h_newelif self.t[s] == 'OPEN':self.k[s] = min(self.k[s], h_new)elif self.t[s] == 'CLOSED':self.k[s] = min(self.h[s], h_new)self.h[s] = h_newself.t[s] = 'OPEN'self.OPEN.add(s)def delete(self, s):"""delete: move state s from OPEN set to CLOSED set.:param s: state should be deleted"""if self.t[s] == 'OPEN':self.t[s] = 'CLOSED'self.OPEN.remove(s)def modify(self, s):"""start processing from state s.:param s: is a node whose status is RAISE or LOWER."""self.modify_cost(s)while True:if not self.OPEN:print("no route")breakk_min = self.process_state()if k_min >= self.h[s]:breakdef modify_cost(self, s):# if node in CLOSED set, put it into OPEN set.# Since cost may be changed between s - s.parent, calc cost(s, s.p) againif self.t[s] == 'CLOSED':self.insert(s, self.h[self.PARENT[s]] + 100*self.cost(s, self.PARENT[s]))def get_neighbor(self, s):nei_list = set()for u in self.u_set:s_next = tuple([s[i] + u[i] for i in range(2)])if s_next not in self.obs:nei_list.add(s_next)return nei_listdef cost(self, s_start, s_goal):"""Calculate Cost for this motion:param s_start: starting node:param s_goal: end node:return: Cost for this motion:note: Cost function could be more complicate!"""if self.is_collision(s_start, s_goal):return float("inf")return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])def is_collision(self, s_start, s_end):if s_start in self.obs:return Trueif s_end in self.obs:return Trueif s_start[0] != s_end[0] and s_start[1] != s_end[1]:if s_end[0] - s_start[0] == s_start[1] - s_end[1]:s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))else:s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))if s1 in self.obs or s2 in self.obs:return Truereturn Falsedef plot_path(self, path):px = [x[0] for x in path]py = [x[1] for x in path]plt.plot(px, py, linewidth=2)plt.plot(self.s_start[0], self.s_start[1], "bs")plt.plot(self.s_goal[0], self.s_goal[1], "gs")def plot_visited(self, visited):color = ['gainsboro', 'lightgray', 'silver', 'darkgray','bisque', 'navajowhite', 'moccasin', 'wheat','powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']if self.count >= len(color) - 1:self.count = 0for x in visited:plt.plot(x[0], x[1], marker='s', color=color[self.count])def main():s_start = (5, 5)s_goal = (45, 25)dstar = DStar(s_start, s_goal,s_start,s_goal)dstar.run(s_start, s_goal)if __name__ == '__main__':main()
注意:
1、上述代码的源码本来是来自于《路径规划算法》这篇文章的代码分享,但是在使用的过程中发现了一个问题:原论文中提到的是从遇到障碍物的位置开始规划,但是这篇文章中每次遇到障碍物后提取路径还是会从最初的起点开始提取,于是这篇文章中的代码中我发现了这么一个问题:当我连续在同一个点附近不断添加障碍物时,从最初的起点开始提取出来到往终点的路径会出现问题:
其实它的真实路径应该是这样子:
出现前面这种问题的原因是在对障碍物周围点的遍历时没有能够修改到足够远的点的父系,导致了从初始点开始的路径并没有被整个的修改掉。这里不确定是D* 算法本身的算法缺陷还是说我对它的文章理解还存在一定的问题。
2、D* 算法虽然会对路径上的障碍物点进行重新规划,但是对于现有的障碍物消除后的路径规划是无法进行的。即如果你将一个障碍物放到路径上,机器人绕行了,然后你去除了障碍物,下次机器人还是会进行绕行,并不会恢复最初的路径。这也是D* 算法的一点缺陷了。
不过总体而言D* 确实是一种非常优异的算法。
参考:
1、《路径规划算法》
2、《D*路径搜索算法原理解析及Python实现》
3、《D*算法超详解》
4、《D*算法原理与程序详解(Python)》
5、《D*算法(Dynamic A Star)》