diff --git a/env/gaming_env.py b/env/gaming_env.py index 99fe451..32bfc2b 100644 --- a/env/gaming_env.py +++ b/env/gaming_env.py @@ -206,44 +206,10 @@ def step(self, actions=None): i = self.tanks.index(tank) # 1) Get BFS path - my_pos = tank.get_grid_position() + my_pos = tank.get_grid_position() opponent_pos = self.tanks[1 - i].get_grid_position() self.path = bfs_path(self.maze, my_pos, opponent_pos) - old_dist = None - next_cell = None - - # 2) If we have a BFS path - if self.path is not None and len(self.path) > 1: - next_cell = self.path[1] - current_bfs_dist = len(self.path) - r, c = next_cell - center_x = c * GRID_SIZE + (GRID_SIZE / 2) - center_y = r * GRID_SIZE + (GRID_SIZE / 2) - - # Get old distance - old_dist = self.euclidean_distance((tank.x, tank.y), (center_x, center_y)) - - # 3) Every 20 BFS steps, apply penalty based on path length - if self.run_bfs % 20 == 0: - if self.last_bfs_dist[i] is not None: - # If we have a stored previous distance, compare - if self.last_bfs_dist[i] is not None: - if current_bfs_dist < self.last_bfs_dist[i]: - # BFS distance decreased => reward - distance_diff = self.last_bfs_dist[i] - current_bfs_dist - - self.tanks[i].reward += BFS_PATH_LEN_REWARD * distance_diff - - elif current_bfs_dist >= self.last_bfs_dist[i]: - # BFS distance increased => penalize - distance_diff = current_bfs_dist - self.last_bfs_dist[i] + 1 - self.tanks[i].reward -= BFS_PATH_LEN_PENALTY * distance_diff - self.last_bfs_dist[i] = current_bfs_dist - - # Increment the BFS step counter - self.run_bfs += 1 - if tank.keys: if keys[tank.keys["left"]]: tank.rotate(ROTATION_DEGREE) elif keys[tank.keys["right"]]: tank.rotate(-ROTATION_DEGREE) @@ -286,62 +252,15 @@ def step(self, actions=None): current_actions = actions[i] # 5) Now the tank actually moves - tank.move(current_actions=current_actions) - - # 5) After move, measure new distance if next_cell is not None - if next_cell is not None and old_dist is not None: - r, c = next_cell - center_x = c * GRID_SIZE + (GRID_SIZE / 2) - center_y = r * GRID_SIZE + (GRID_SIZE / 2) - new_dist = self.euclidean_distance((tank.x, tank.y), (center_x, center_y)) - - if new_dist < old_dist: - self.tanks[i].reward += BFS_FORWARD_REWARD * (old_dist - new_dist) - elif new_dist > old_dist: - self.tanks[i].reward -= BFS_BACKWARD_PENALTY * (new_dist - old_dist) - - self.run_bfs += 1 + tank.move(current_actions=current_actions, maze = self.maze) # ========== AI ONLY MODE ========== else: for tank in self.tanks: i = self.tanks.index(tank) - # overall_bfs_dist = 0 - - # 2) BFS path my_pos = tank.get_grid_position() opponent_pos = self.tanks[1 - i].get_grid_position() - self.path = bfs_path(self.maze, my_pos,opponent_pos) - - self.run_bfs += 1 - old_dist = None - next_cell = None - if self.path is not None and len(self.path) > 1: - next_cell = self.path[1] - current_bfs_dist = len(self.path) - r, c = next_cell - center_x = c * GRID_SIZE + (GRID_SIZE / 2) - center_y = r * GRID_SIZE + (GRID_SIZE / 2) - old_dist = self.euclidean_distance((tank.x, tank.y), (center_x, center_y)) - if self.run_bfs % 20 == 0: - # If we have a stored previous distance, compare - if self.last_bfs_dist[i] is not None: - if current_bfs_dist < self.last_bfs_dist[i]: - # BFS distance decreased => reward - distance_diff = self.last_bfs_dist[i] - current_bfs_dist - - self.tanks[i].reward += BFS_PATH_LEN_REWARD * distance_diff - - elif current_bfs_dist >= self.last_bfs_dist[i]: - # BFS distance increased => penalize - distance_diff = current_bfs_dist - self.last_bfs_dist[i] + 1 - self.tanks[i].reward -= BFS_PATH_LEN_PENALTY * distance_diff - - - self.last_bfs_dist[i] = current_bfs_dist - - # Increment the BFS step counter - self.run_bfs += 1 + self.path = bfs_path(self.maze, my_pos, opponent_pos) i = self.tanks.index(tank) # **获取坦克索引** if actions[i][0] == 0: tank.rotate(ROTATION_DEGREE) # **左转** @@ -353,22 +272,10 @@ def step(self, actions=None): if actions[i][2] == 1: tank.shoot() # **射击** else: pass current_actions = actions[i] - tank.move(current_actions=current_actions) + tank.move(current_actions=current_actions, maze = self.maze) # ### NEW LOGIC ### # 5) After move, measure new distance if next_cell is not None - if next_cell is not None and old_dist is not None: - r, c = next_cell - center_x = c * GRID_SIZE + (GRID_SIZE / 2) - center_y = r * GRID_SIZE + (GRID_SIZE / 2) - new_dist = self.euclidean_distance((tank.x, tank.y), (center_x, center_y)) - - if new_dist < old_dist: - self.tanks[i].reward += BFS_FORWARD_REWARD * (old_dist - new_dist) - elif new_dist > old_dist: - self.tanks[i].reward -= BFS_BACKWARD_PENALTY * (new_dist - old_dist) - - self.run_bfs += 1 self.bullets_trajs = [traj for traj in self.bullets_trajs if not traj.update()] # -- Move bullets again or do collision checks if desired -- diff --git a/env/sprite.py b/env/sprite.py index bbb3884..d8ac840 100644 --- a/env/sprite.py +++ b/env/sprite.py @@ -5,6 +5,7 @@ from env.util import * import numpy as np from PIL import Image, ImageSequence, ImageEnhance +from env.bfs import * # Reward is now defined by teams @@ -172,6 +173,13 @@ def __init__(self, team, x, y, color, keys, mode, env): self.hittingWall = False self.mode = mode + # BFS + self.old_dist = None + self.next_cell = None + self.path = None + self.last_bfs_dist = None + self.run_bfs = 0 + # reward compute self.last_x, self.last_y = x, y # 记录上一次位置 self.stationary_steps = 0 # 站立不动的帧数 @@ -257,7 +265,7 @@ def get_corners(self, x=None, y=None, angle=None): ] return [center + c.rotate(angle) for c in corners] - def move(self, current_actions=None): + def move(self, current_actions=None, maze = None): if not self.alive: return @@ -266,6 +274,10 @@ def move(self, current_actions=None): new_y = self.y - self.speed * math.sin(rad) new_corners = self.get_corners(new_x, new_y) + # Find BFS Path + my_pos = self.get_grid_position() + opponent_pos = self.get_grid_position() + self.path = bfs_path(maze, my_pos, opponent_pos) '''Reward #1: hitting the wall''' # self._wall_penalty(new_corners) @@ -288,8 +300,58 @@ def move(self, current_actions=None): '''Rward $7 Dodge Reward''' self._dodge_reward() - # self._action_consistency_reward(current_actions) + if self.path is not None and len(self.path) > 1: + self.bfs_reward_global() + + if self.next_cell is not None and self.old_dist is not None: + self.bfs_reward_local() + + # self._action_consistency_reward(current_actions) + def euclidean_distance(self, cell_a, cell_b): + (r1, c1) = cell_a + (r2, c2) = cell_b + return math.sqrt((r1 - r2) ** 2 + (c1 - c2) ** 2) + + def bfs_reward_global(self): + self.next_cell = self.path[1] + current_bfs_dist = len(self.path) + r, c = self.next_cell + center_x = c * GRID_SIZE + (GRID_SIZE / 2) + center_y = r * GRID_SIZE + (GRID_SIZE / 2) + + # Get old distance + self.old_dist = self.euclidean_distance((self.x, self.y), (center_x, center_y)) + + # 3) Every 10 BFS steps, apply penalty based on path length + if self.run_bfs % 10 == 0: + if self.last_bfs_dist is not None: + # If we have a stored previous distance, compare + if self.last_bfs_dist is not None: + if current_bfs_dist < self.last_bfs_dist: + # BFS distance decreased => reward + distance_diff = self.last_bfs_dist - current_bfs_dist + + self.reward += BFS_PATH_LEN_REWARD * distance_diff + + elif current_bfs_dist >= self.last_bfs_dist: + # BFS distance increased => penalize + distance_diff = current_bfs_dist - self.last_bfs_dist + 1 + self.reward -= BFS_PATH_LEN_PENALTY * distance_diff + self.last_bfs_dist = current_bfs_dist + + # Increment the BFS step counter + self.run_bfs += 1 + def bfs_reward_local(self): + r, c = self.next_cell + center_x = c * GRID_SIZE + (GRID_SIZE / 2) + center_y = r * GRID_SIZE + (GRID_SIZE / 2) + new_dist = self.euclidean_distance((self.x, self.y), (center_x, center_y)) + + if new_dist < self.old_dist: + self.reward += BFS_FORWARD_REWARD * (self.old_dist - new_dist) + elif new_dist > self.old_dist: + self.reward -= BFS_BACKWARD_PENALTY * (new_dist - self.old_dist) def _rotate_penalty(self): """Reward #7: Penalize excessive rotation without movement""" # Calculate distance moved since last rotation check