路径规划——D*算法
D Star算法是一种用于动态环境下的算法,它可以在环境变化时快速更新路径。
算法原理
D Star算法是一种反向增量式搜索算法,反向即算法从目标点开始向起点逐步搜索;增量式搜索,即算法在搜索过程中会计算每一个节点的距离度量信息H(x),在动态环境中若出现障碍物无法继续沿预先路径搜索,算法会根据原先已经得到的每个点的距离度量信息在当前状态点进行路径再规划,无需从目标点进行重新规划。
算法流程:
1.起初,起点和终点的位置是已知的,环境中的障碍物未知或部分未知。初始化一个开放列表openlist(优先队列),将终点添加到该列表中,并将起点作为搜索目标。
2.在路径搜索过程中,D Star算法不使用启发信息,更像Dijkstra算法,由于是反向搜索,D Star算法计算每个节点到终点的成本代价,然后通过扩展节点来寻找从起点到终点的最优路径。在初次运行时,它使用Dijkstra算法生成一条从起点到终点的最短路径。
3.然而,当环境发生变化时,例如,检测到新的障碍物或路径变得不可通行时,要求算法可以动态更新路径。它通过回溯先前的路径,从遇到障碍物的节点开始重新计算新的路径。
4.在发生变化的区域重新执行D Star算法,从变化点开始,重新计算新的路径。D Star算法的一个关键特性是局部修复,即它只需要重新计算受到变化影响的部分路径,而不是整个路径。这使得算法在动态环境中能够高效地适应环境变化。
5.一旦路径更新完成,算法继续按照新的最短路径前进。每当环境再次变化时,重复上述步骤,直到达到终点。
D*算法有三个重要的函数:
1.process_state():该函数是用来降低openlist表中的某个节点x(state)的h(x)值或者传播节点x的h(x)值变更信息给邻居节点,让邻居节点进行相应变更的同时进行路径搜索查找的;
2.insert(x,val):该函数是用来修改节点x的状态以及h(x)值和k(x)值的;
3.modify_cost(x,y,val):该函数是用来修改节点x和y之间的移动代价cost(x,y),而且根据节点y的状态t(y)的情况,可能对节点y的h(y)值和状态t(y)进行修改的
函数process_state()是D*算法的核心,具体代码如下:
def process_state(self):node =self.min_state() # Find the node whose value of node.k is the smallest.self.searched.append(node.position)if node is None:return -1k_old = self.get_kmin() # Find the minimum node.k as k_oldself.delete(node) # Delete the node from openlist, and modify node.t from 'OPEN' to 'CLOSE'neighbors, distances = self.get_neighbors(node)# RASIE State, its path cost may not be optimalif k_old < node.h: for neighbor,distance in zip(neighbors,distances): #if neighbor.h <= k_old and node.h > neighbor.h + distance: #node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #node.h = neighbor.h + distance # 检查node的最优相邻状态,如果存在则降低 h 值# LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .if k_old == node.h:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \or (neighbor.parent != node.position and neighbor.h > node.h+distance):neighbor.parent = node.position # Searching commonlyself.insert(neighbor,node.h+distance)else:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):neighbor.parent = node.positionself.insert(neighbor,node.h+distance)else:if neighbor.parent != node.position and neighbor.h > node.h + distance:self.insert(node,node.h)else:if neighbor.parent != node.position and node.h > neighbor.h+distance \and neighbor.t == 'CLOSED' and neighbor.h > k_old:self.insert(neighbor,neighbor.h)return self.get_kmin()
下面解析此函数:
# RASIE State, its path cost may not be optimal
if k_old < node.h: #for neighbor,distance in zip(neighbors,distances): #if neighbor.h <= k_old and node.h > neighbor.h + distance: #node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #node.h = neighbor.h + distance
“RASIE”状态,其路径代价可能不是最优的,可以通过重新计算邻居节点来寻找最优路径;主要是在障碍物更新之后发挥作用。
当前访问的节点受到了新增障碍物的影响(比如,当前节点的父节点是新增障碍物),向h值越小的方向扩展,即从当前节点向靠近目标节点的方向扩展,并更新当前节点的父节点以更新路径.
# LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .
if k_old == node.h:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \or (neighbor.parent != node.position and neighbor.h > node.h+distance):neighbor.parent = node.position # Searching commonlyself.insert(neighbor,node.h+distance)
“LOWER”状态,其路径代价是最优的,正常搜索即可。
当前访问的节点还没有受新增障碍物的影响, 如果邻居节点neighbor的状态是"NEW"即未访问过, 或者邻居节点neighbor的父节点是当前节点node并且邻居节点的h值 不等于 当前节点的h值加上两节点间的距离,或者邻居节点neighbor的父节点不是当前节点node并且邻居节点的h值 大于 当前节点的h值加上两节点间的距离;那么!设邻居节点的父节点为当前节点,将neighbor节点放入openlist中进行访问搜索,并将其h值更新为当前节点的h值加上两节点间的距离.
else:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):neighbor.parent = node.positionself.insert(neighbor,node.h+distance)// 如果邻居节点neighbor的状态是"NEW"即未访问过,// 或者邻居节点neighbor的父节点是当前节点node并且邻居节点的h值 大于 当前节点的h值加上两节点间的距离// 那么!设邻居节点的父节点为当前节点,并且将邻居节点的h值更新为当前节点的h值加上两节点间的距离,// 将neighbor节点放入openlist中进行访问搜索,也就是将采用从当前节点node经过neighbor节点的路径(此路径不一定是最终选择的路径)else:if neighbor.parent != node.position and neighbor.h > node.h + distance:// 如果邻居节点neighbor的父节点不是当前访问节点node,// 并且邻居节点的h值 大于 当前节点node的h值加上两节点间的距离,那么邻居节点不是“好点”,// 为了更小的代价,便不应该从node经过neighbor,将node加入到openlist中重新访问搜索self.insert(node,node.h)else:if neighbor.parent != node.position and node.h > neighbor.h+distance \and neighbor.t == 'CLOSED' and neighbor.h > k_old:self.insert(neighbor,neighbor.h)// 如果邻居节点neighbor的父节点不是当前访问节点node,并且邻居节点是个“好点”,// 并且邻居节点的状态是“已访问”,同时邻居节点的h值 大于 k_old,这时node应该是新增的障碍物,// 这时应该添加neighbor到openlist中,对其进行访问搜索
其他的情况,如果邻居是新的或其父节点是当前节点但代价不匹配,则更新父节点并插入开放列表。如果邻居的父节点不是当前节点且代价不匹配,则可能需要插入当前节点或邻居节点进行进一步处理。
函数**process_state()**理解起来是有一定难度的,逻辑思路有一定的复杂性,大家可以结合具体案例慢慢理解。
D*论文链接:https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=351061
算法实现
"""Filename: dStar.pyDescription: Plan path using Dynamic A* AlgorithmAuthor: Benxiaogu:https://github.com/BenxiaoguDate: 2024-08-29
"""import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import timeclass Node:"""Class for D* nodes.Parameters:position (tuple): current coordinateparent (tuple): coordinate of parent nodet (str): state of node, including `NEW` `OPEN` and `CLOSED`'NEW': the node was never put into openlist;'OPEN': the node is in openlist'CLOSED': the node is no longer in openlisth (float): cost from goal to current nodek (float): minimum cost from goal to current node in history"""def __init__(self, position, parent, t, h, k) -> None:self.position = positionself.parent = parentself.t = tself.h = hself.k = kdef __add__(self, node):return Node((self.x + node.x, self.y + node.y), self.parent, self.t, self.h + node.h, self.k)def __eq__(self, other) -> bool:return self.position == other.positiondef __lt__(self,other):return self.k+self.h < other.k+other.h or (self.k+self.h==other.k+other.h and self.h<other.h)class DStar:def __init__(self,grid,start,goal,board_size) -> None:self.grid = gridself.start = Node(start,None,'NEW',float('inf'),float('inf'))self.goal = Node(goal,None,'NEW',0,float('inf'))self.board_size = board_sizeself.path = []self.open = []self.searched = [] # Used to record nodes that are searchedgrid_map = []for i in range(len(self.grid)):for j in range(len(self.grid[0])):grid_map.append((i,j))self.map = {n: Node(n, None, 'NEW', float('inf'), float('inf')) for n in grid_map}self.map[self.goal.position] = self.goalself.map[self.start.position] = self.startself.insert(self.goal,0)self.fig, self.ax = plt.subplots()def run(self):cost = self.plan()self.fig.canvas.mpl_connect('button_press_event', self.update)self.visualize_grid(cost)def plan(self):while True:self.process_state()if self.start.t == 'CLOSED':breakself.searched.clear()# Find the goalpathnode = self.startcost = 0self.path.append(pathnode.position)while pathnode != self.goal:nodeparent = self.map[pathnode.parent]cost += self.cost(pathnode, nodeparent)pathnode = nodeparentself.path.append(pathnode.position)return costclass Neighbor:def __init__(self,direction,cost):self.direction = directionself.cost = costdef get_neighbors(self, current_node):neighbors = []distances = []node = current_node.positionnexts = [self.Neighbor((-1, 0),1), self.Neighbor((0, 1),1), self.Neighbor((0, -1),1), self.Neighbor((1,0),1),self.Neighbor((-1,1),math.sqrt(2)), self.Neighbor((1,1),math.sqrt(2)),self.Neighbor((1, -1),math.sqrt(2)), self.Neighbor((-1,-1),math.sqrt(2))]for next in nexts:neighbor = (node[0] + next.direction[0], node[1] + next.direction[1])neighborNode = self.map[neighbor]if not self.isCollision(node,neighbor):neighbors.append(neighborNode)distances.append(next.cost)return neighbors,distancesdef isCollision(self,node1,node2):"""Check if there will be a collision from node1 to node2"""if self.board_size <= node1[0] < len(self.grid)-self.board_size and self.board_size <= node1[1] < len(self.grid[0])-self.board_size \and self.board_size <= node2[0] < len(self.grid)-self.board_size and self.board_size <= node2[1] < len(self.grid[0])-self.board_size:if self.grid[node1[0]][node1[1]] == 0 and self.grid[node2[0]][node2[1]] == 0:direction1 = node1[0]-node2[0]direction2 = node1[1]-node2[1]if direction1 != 0 and direction2 != 0: # 对角线方向if self.grid[node1[0]][node2[1]] == 0 and self.grid[node2[0]][node1[1]] == 0:return Falseelse:return Falsereturn Truedef process_state(self):"""Compute optimal path costs to the goal"""node =self.min_state() # Find the node whose value of node.k is the smallest.self.searched.append(node.position)if node is None:return -1k_old = self.get_kmin() # Find the minimum node.k as k_oldself.delete(node) # Delete the node from openlist, and modify node.t from 'OPEN' to 'CLOSE'neighbors, distances = self.get_neighbors(node)# RASIE State, its path cost may not be optimalif k_old < node.h: #for neighbor,distance in zip(neighbors,distances): #if neighbor.h <= k_old and node.h > neighbor.h + distance: #node.parent = neighbor.position # Added obstacles, replanning, find better neighbor #node.h = neighbor.h + distance # 检查node的最优相邻状态,如果存在则降低 h 值# LOWER State, its path cost is optimal since h ( X ) is equal to the old k_min .if k_old == node.h:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance) \or (neighbor.parent != node.position and neighbor.h > node.h+distance):neighbor.parent = node.position # Searching commonlyself.insert(neighbor,node.h+distance)else:for neighbor,distance in zip(neighbors,distances):if neighbor.t == 'NEW' or (neighbor.parent == node.position and neighbor.h != node.h+distance):neighbor.parent = node.positionself.insert(neighbor,node.h+distance)else:if neighbor.parent != node.position and neighbor.h > node.h + distance:self.insert(node,node.h)else:if neighbor.parent != node.position and node.h > neighbor.h+distance \and neighbor.t == 'CLOSED' and neighbor.h > k_old:self.insert(neighbor,neighbor.h)return self.get_kmin()def min_state(self):"""Return the node who possesses the minimum value of k."""if not self.open:return Nonemin_state = min(self.open, key=lambda x: x.k)return min_statedef get_kmin(self):"""Return the minimum value of k."""if not self.open:return -1k_min = min([x.k for x in self.open])return k_mindef delete(self, node) -> None:"""Remove node from openlist, and change node.t from 'OPEN' to 'CLOSED'"""if node.t == 'OPEN':node.t = 'CLOSED'self.open.remove(node)def insert(self, node: Node, h_new: float) -> None:"""Modify the state of node as well as the node.h value and node.k value."""if node.t == 'NEW': node.k = h_newelif node.t == 'OPEN': node.k = min(node.k, h_new)elif node.t == 'CLOSED': node.k = min(node.h, h_new)node.h = h_newnode.t = 'OPEN'self.open.append(node)def modify_cost(self,node,node_parent):"""Change the cost and enter affected nodes on the OPEN list."""if node.t == 'CLOSED':self.insert(node,node_parent.h+self.cost(node,node_parent))while True:k_min = self.process_state()if k_min >= node.h:breakdef cost(self, node1: Node, node2: Node) -> float:"""Return Euclidean distance if there is no collision from node1 to node2 otherwise 'inf'"""if self.isCollision(node1.position, node2.position):return float("inf")return math.hypot(node2.position[0] - node1.position[0], node2.position[1] - node1.position[1])def update(self, event) -> None:"""Update obstacles and replan from affected node."""x, y = int(event.xdata), int(event.ydata)if y < self.board_size or y > len(self.grid) -self.board_size- 1 or x < self.board_size or x > len(self.grid[0])-self.board_size - 1:print("Please choose right area!")else:self.searched.clear()if self.grid[y][x] == 0:# update obstaclesself.grid[y][x] = 1node = self.startcost = 0self.path.clear()while node != self.goal:node_parent = self.map[node.parent]if self.isCollision(node.position, node_parent.position): # when meeting collisions, call modify_cost() functionself.modify_cost(node,node_parent)continueself.path.append(node.position)cost += self.cost(node, node_parent)node = node_parentself.path.append(self.goal.position)plt.cla()self.visualize_grid(cost)def visualize_grid(self, cost):...
完整代码:https://github.com/Benxiaogu/PathPlanning/tree/main/D_Star