From 51efc257b330c64591aaac93ca0bd99d435a1a11 Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Fri, 19 Sep 2025 17:49:32 -0400 Subject: [PATCH 1/6] wip relative position sensor (cherry picked from commit f435adbe23993d583c28c5ac07a9045d45c1b920) --- src/swarmsim/sensors/RelativeAgentSensor.py | 248 ++++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 src/swarmsim/sensors/RelativeAgentSensor.py diff --git a/src/swarmsim/sensors/RelativeAgentSensor.py b/src/swarmsim/sensors/RelativeAgentSensor.py new file mode 100644 index 0000000..03b2109 --- /dev/null +++ b/src/swarmsim/sensors/RelativeAgentSensor.py @@ -0,0 +1,248 @@ +from swarmsim.world.RectangularWorld import RectangularWorld +import pygame +import numpy as np +import math +from .AbstractSensor import AbstractSensor +from typing import List +from ..world.goals.Goal import CylinderGoal +from ..util.collider.AABB import AABB + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from ..world.World import World +else: + World = None + +import warnings +import quads + + +class BinaryFOVSensor(AbstractSensor): + config_vars = AbstractSensor.config_vars + [ + 'theta', 'distance', 'bias', 'false_positive', 'false_negative', + 'walls', 'wall_sensing_range', 'time_step_between_sensing', 'invert', + 'store_history', 'detect_goal_with_added_state', 'show', 'target_team' + ] + + DEBUG = False + + def __init__( + self, + agent=None, + parent=None, + distance=100.0, + time_step_between_sensing=1, + store_history=False, + show=True, + seed=None, + target_team=None, + **kwargs + ): + super().__init__(agent=agent, parent=parent, seed=seed, **kwargs) + self.time_step_between_sensing = time_step_between_sensing + self.time_since_last_sensing = 0 + self.history = [] + self.store_history = store_history + self.show = show + self.target_team = target_team + + self.detect_only_origins = False + + self.r = distance + + def checkForLOSCollisions(self, world: RectangularWorld) -> None: + # Mathematics obtained from Sundaram Ramaswamy + # https://legends2k.github.io/2d-fov/design.html + # See section 3.1.1.2 + self.time_since_last_sensing += 1 + if self.time_since_last_sensing % self.time_step_between_sensing != 0: + # Our sensing rate occurs less frequently than our dt physics update, so we need to + # only check for LOS collisions every n timesteps. + return + + self.time_since_last_sensing = 0 + + sensor_origin = self.agent.pos + + if world.quad: + # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector + quadpoints = [point.data for point in world.quad.within_bb( + quads.BoundingBox(*self.getAARectContainingSector(world)))] + agents = np.array([agent for data in quadpoints for agent in data], dtype=object) + else: + agents = np.array(world.population, dtype=object) + # filter agents to those within the sensing radius + if agents.size: # if any agents found + positions = np.array([agent.pos for agent in agents]) + distances = np.linalg.norm(positions - sensor_origin, axis=1) + is_close = distances < self.r + bag = agents[is_close] + else: + bag = [] + + detected = [] + + for other in bag: + if other is self.agent: # skip the agent the sensor is attached to + continue + + if self.target_team and not other.team == self.target_team: + continue + + u = other.pos - sensor_origin # vector to agent + + + # if an agent was in the fov then this function would have returned, so determine the sensing state to be false + self.determineState(False, None, world) + return + + def step(self, world, only_check_goals=False): + super(BinaryFOVSensor, self).step(world=world) + self.checkForLOSCollisions(world=world) + if self.store_history: + if self.agent.agent_in_sight: + self.history.append(int(self.agent.agent_in_sight.name)) + else: + self.history.append(-1) + + def draw(self, screen, offset=((0, 0), 1.0)): + super(BinaryFOVSensor, self).draw(screen, offset) + pan, zoom = np.asarray(offset[0]), np.asarray(offset[1]) + zoom: float + if self.show: + # Draw Sensory Vector (Vision Vector) + sight_color = (255, 0, 0) + if self.current_state == 1: + sight_color = (0, 255, 0) + if self.current_state == 2: + sight_color = (255, 255, 0) + + # draw the whiskers + # length = actual sensor range if agent is selected/highlighted, otherwise draw relative to agent radius + magnitude = self.r if self.agent.is_highlighted else self.agent.radius * 5 + + head = np.asarray(self.agent.getPosition()) * zoom + pan + e_left, e_right = self.getSectorVectors() + e_left, e_right = np.asarray(e_left[:2]), np.asarray(e_right[:2]) + + tail_l = head + magnitude * e_left * zoom + tail_r = head + magnitude * e_right * zoom + + pygame.draw.line(screen, sight_color, head, tail_l) + pygame.draw.line(screen, sight_color, head, tail_r) + if self.agent.is_highlighted: + width = max(1, round(0.01 * zoom)) + # pygame.draw.circle(screen, sight_color + (50,), head, self.r * zoom, width) + # draw the arc of the sensor cone + range_bbox = AABB.from_center_wh(head, self.r * 2 * zoom) + langle = self.agent.angle + self.angle + self.theta + rangle = self.agent.angle + self.angle - self.theta + pygame.draw.arc(screen, sight_color + (50,), range_bbox.to_rect(), -langle, -rangle, width) + + if not self.DEBUG: + return + # DEBUG DRAWINGS: + if self.wall_sensing_range: + pygame.draw.circle(screen, (150, 150, 150, 50), head, self.wall_sensing_range * zoom, width) + AAR = self.getAARectContainingSector(self.agent.world) + AARtl = np.array(AAR[:2]) * zoom + pan + AARbr = np.array(AAR[2:]) * zoom + pan + pygame.draw.rect(screen, sight_color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) + detected = [point.data for point in self.agent.world.quad.within_bb(quads.BoundingBox(*AAR))] + for agent in detected: + pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) + + # this function has been replaced by a more efficient procedure in checkForLOSCollisions and is no longer called there + def circle_interesect_sensing_cone(self, u, r): + e_left, e_right = self.getSectorVectors() + directional = np.dot(u, self.getBiasedSightAngle()) + if directional > 0: + u = np.append(u, [0]) + cross_l = np.cross(e_left, u) + cross_r = np.cross(u, e_right) + sign_l = np.sign(cross_l) + sign_r = np.sign(cross_r) + added_signs = sign_l - sign_r + sector_boundaries = np.all(added_signs == 0) + if sector_boundaries: + d_to_inter = np.linalg.norm(u) + return d_to_inter + + # It may also be the case that the center of the agent is not within the FOV, but that some part of the + # circle is visible and on the edges of the left and right viewing vectors. + # LinAlg Calculations obtained from https://www.bluebill.net/circle_ray_intersection.html + + # u, defined earlier is the vector from the point of interest to the center of the circle + # Project u onto e_left and e_right + u_l = np.dot(u, e_left) * e_left + u_r = np.dot(u, e_right) * e_right + + # Determine the minimum distance between the agent's center (center of circle) and the projected vector + dist_l = np.linalg.norm(u - u_l) + dist_r = np.linalg.norm(u - u_r) + + radius = r # Note: Assumes homogenous radius + if dist_l < radius: + d_to_inter = np.linalg.norm(u) + return d_to_inter + if dist_r < radius: + d_to_inter = np.linalg.norm(u) + return d_to_inter + return None + + def withinRadiusExclusiveFast(self, origin, other, radius): + diff = origin - other + return np.dot(diff, diff) < radius**2 + + def getLOSVector(self) -> List: + if self.angle is None: + return self.agent.orientation_uvec() + + return [ + math.cos(self.angle + self.agent.angle), + math.sin(self.angle + self.agent.angle) + ] + + def getBiasedSightAngle(self): + angle: float = self.agent.angle + self.bias + return vectorize(angle) + + def getSectorVectors(self): + angle: float = self.agent.angle + self.bias + span: float = self.theta + + leftBorder = vectorize(angle + span) + rightBorder = vectorize(angle - span) + return np.append(leftBorder, 0), np.append(rightBorder, 0) + + def as_config_dict(self): + return { + "type": "BinaryFOVSensor", + "theta": self.theta, + "bias": self.bias, + "fp": self.fp, + "fn": self.fn, + "time_step_between_sensing": self.time_step_between_sensing, + "store_history": self.store_history, + "use_goal_state": self.use_goal_state, + "wall_sensing_range": self.wall_sensing_range, + "goal_sensing_range": self.goal_sensing_range, + "agent_sensing_range": self.r, + "seed": self.seed, + } + + @staticmethod + def from_dict(d): + return BinaryFOVSensor( + parent=None, + theta=d["theta"], + distance=d["agent_sensing_range"], + bias=d["bias"], + false_positive=d.get("fp", 0.0), + false_negative=d.get("fn", 0.0), + store_history=d["store_history"], + detect_goal_with_added_state=d["use_goal_state"], + wall_sensing_range=d["wall_sensing_range"], + goal_sensing_range=d["goal_sensing_range"], + seed=d["seed"] if "seed" in d else None, + ) From 7691ab14222112c652c9de2eb03dfb92285b5a70 Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Sun, 21 Sep 2025 23:09:41 -0400 Subject: [PATCH 2/6] use numpy for BinaryFOVSensor bounding box calculations (cherry picked from commit 51eaf9d29bcdd01dbc1235a0c88889fb1996862c) --- src/swarmsim/sensors/AbstractSensor.py | 12 +- src/swarmsim/sensors/BinaryFOVSensor.py | 161 ++++++++++-------------- src/swarmsim/sensors/RangedSensor.py | 116 +++++++++++++++++ 3 files changed, 192 insertions(+), 97 deletions(-) create mode 100644 src/swarmsim/sensors/RangedSensor.py diff --git a/src/swarmsim/sensors/AbstractSensor.py b/src/swarmsim/sensors/AbstractSensor.py index 515aaa9..57164a1 100644 --- a/src/swarmsim/sensors/AbstractSensor.py +++ b/src/swarmsim/sensors/AbstractSensor.py @@ -13,8 +13,16 @@ class AbstractSensor: config_vars = ['static_position', 'n_possible_states', 'show'] - def __init__(self, agent=None, parent=None, static_position=None, n_possible_states=0, - draw=True, seed=None, **kwargs): + def __init__( + self, + agent=None, + parent=None, + static_position=None, + n_possible_states=0, + draw=True, + seed=None, + **kwargs + ): """Sensor class for the agent. Sensors should typically have a parent that is assigned to them that must be of subclass 'Agent' diff --git a/src/swarmsim/sensors/BinaryFOVSensor.py b/src/swarmsim/sensors/BinaryFOVSensor.py index 6c97f18..4697471 100644 --- a/src/swarmsim/sensors/BinaryFOVSensor.py +++ b/src/swarmsim/sensors/BinaryFOVSensor.py @@ -2,7 +2,7 @@ import pygame import numpy as np import math -from .AbstractSensor import AbstractSensor +from .RangedSensor import RangedSensor from typing import List from ..world.goals.Goal import CylinderGoal from ..util.collider.AABB import AABB @@ -19,13 +19,18 @@ # convert an angle to a representative unit vector def vectorize(angle): - return np.array((np.cos(angle), np.sin(angle))) + return np.asarray((np.cos(angle), np.sin(angle))) # compute the vector turn value from origin→p1 to origin→p2 # this value is positive if a left turn is the fastest way to go from p1 to p2, zero if p1 and p2 are colinear, and negative otherwise def turn(p1, p2): - return p1[0] * p2[1] - p2[0] * p1[1] + return np.linalg.det(np.vstack((p1[:2], p2[:2]))) + + +def rad02pi(angle: float | np.ndarray) -> np.ndarray | float: + x = np.fmod(angle, 2 * np.pi) + return np.where(x < 0, 2 * np.pi + x, x) # project vector a onto vector b @@ -39,14 +44,32 @@ def lineCircleIntersect(line, center, radius): return np.dot(clDiffVec, clDiffVec) <= radius**2 -class BinaryFOVSensor(AbstractSensor): - config_vars = AbstractSensor.config_vars + [ - 'theta', 'distance', 'bias', 'false_positive', 'false_negative', +PIC = 2 * np.pi +axes_angles = np.array([0, np.pi / 2, np.pi, 3 * np.pi / 2]) +axes_points = np.array([ + [1, 0], + [0, 1], + [-1, 0], + [0, -1], +]) +zeros2 = np.zeros(2) + + +def axes_in_fov(lower, upper): + # used to help expand the AABB to include the arc of the sensor cone + # explanation: https://stackoverflow.com/questions/66799475/how-to-elegantly-find-if-an-angle-is-between-a-range + mask = np.remainder(axes_angles - lower, PIC) > np.remainder(upper - lower, PIC) + return axes_points[mask] + + +class BinaryFOVSensor(RangedSensor): + config_vars = RangedSensor.config_vars + [ + 'theta', 'bias', 'false_positive', 'false_negative', 'walls', 'wall_sensing_range', 'time_step_between_sensing', 'invert', 'store_history', 'detect_goal_with_added_state', 'show', 'target_team' ] - DEBUG = False + DEBUG = True def __init__( self, @@ -64,12 +87,22 @@ def __init__( invert=False, store_history=False, detect_goal_with_added_state=False, + detect_only_origins=False, show=True, seed=None, target_team=None, **kwargs ): - super().__init__(agent=agent, parent=parent, seed=seed, **kwargs) + super().__init__( + agent=agent, + parent=parent, + distance=distance, + time_step_between_sensing=time_step_between_sensing, + detect_only_origins=detect_only_origins, + target_team=target_team, + seed=seed, + **kwargs + ) self.angle = 0.0 self.theta = theta self.bias = bias @@ -89,7 +122,7 @@ def __init__( self.detection_id = 0 self.target_team = target_team - self.detect_only_origins = False + self.detect_only_origins = detect_only_origins NOTFOUND = object() if (degrees := kwargs.pop('degrees', NOTFOUND)) is not NOTFOUND: @@ -103,31 +136,8 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: # Mathematics obtained from Sundaram Ramaswamy # https://legends2k.github.io/2d-fov/design.html # See section 3.1.1.2 - self.time_since_last_sensing += 1 - if self.time_since_last_sensing % self.time_step_between_sensing != 0: - # Our sensing rate occurs less frequently than our dt physics update, so we need to - # only check for LOS collisions every n timesteps. - return - - self.time_since_last_sensing = 0 - - sensor_origin = self.agent.pos - if world.quad: - # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector - quadpoints = [point.data for point in world.quad.within_bb( - quads.BoundingBox(*self.getAARectContainingSector(world)))] - agents = np.array([agent for data in quadpoints for agent in data], dtype=object) - else: - agents = np.array(world.population, dtype=object) - # filter agents to those within the sensing radius - if agents.size: - positions = np.array([agent.pos for agent in agents]) - distances = np.linalg.norm(positions - sensor_origin, axis=1) - is_close = distances < self.r - bag = agents[is_close] - else: - bag = [] + bag = self.get_agents_within_range(world) # get left and right whiskers e_left, e_right = self.getSectorVectors() @@ -136,10 +146,11 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: # TODO: Rewrite all this to use WorldObjects # see https://www.geeksforgeeks.org/check-if-two-given-line-segments-intersect/ consideration_set = [] + origin = self.position if self.walls is not None: # Get e_left, e_right line_segments - l = [sensor_origin, sensor_origin + (e_left[:2] * self.wall_sensing_range)] # noqa: E741 - r = [sensor_origin, sensor_origin + (e_right[:2] * self.wall_sensing_range)] + l = [origin, origin + (e_left[:2] * self.wall_sensing_range)] # noqa: E741 + r = [origin, origin + (e_right[:2] * self.wall_sensing_range)] wall_top = [self.walls[0], [self.walls[1][0], self.walls[0][1]]] wall_right = [[self.walls[1][0], self.walls[0][1]], self.walls[1]] wall_bottom = [self.walls[1], [self.walls[0][0], self.walls[1][1]]] @@ -150,7 +161,7 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: for line in [l, r]: if self.lines_segments_intersect(line, wall): d_to_inter = np.linalg.norm( - np.array(self.line_seg_int_point(line, wall)) - np.array(sensor_origin)) + np.array(self.line_seg_int_point(line, wall)) - np.array(origin)) consideration_set.append((d_to_inter, None)) # Detect for World Objects @@ -180,7 +191,7 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: if self.target_team and not agent.team == self.target_team: continue - u = agent.getPosition() - sensor_origin # vector to agent + u = agent.getPosition() - origin # vector to agent leftTurn = turn(u, e_left) rightTurn = turn(u, e_right) @@ -208,73 +219,33 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: self.determineState(False, None, world) return + def make_bounding_box(self): + return self.getAARectContainingSector(self.agent.world) # get the smallest rectangle that contains the sensor fov sector + # get the smallest rectangle that contains the sensor fov sector def getAARectContainingSector(self, world: RectangularWorld): angle: float = self.agent.angle + self.bias # global sensor angle span: float = self.theta # angle fov sweeps to either side radius: float = self.r # view radius - position: list[float] = self.agent.pos.tolist() # agent global position - - over180 = np.pi <= span * 2 # true if 180 <= FOV - - center = vectorize(angle) # vector representing absolute look direction - leftWhisker = vectorize(angle + span) # vector representing left whisker - rightWhisker = vectorize(angle - span) # vector representing right whisker - xaxis = (1, 0) # vector representing positive x axis - yaxis = (0, 1) # vector representing positive y axis - - xts = np.sign(turn(xaxis, center)) # sign of turn from x axis to look direction - fovOverXAxis = np.sign(turn(xaxis, leftWhisker)) != np.sign(turn(xaxis, rightWhisker)) # true if turns from x axis to whiskers have different signs - - yts = np.sign(turn(yaxis, center)) # sign of turn from y axis to look direction - fovOverYAxis = np.sign(turn(yaxis, leftWhisker)) != np.sign(turn(yaxis, rightWhisker)) # true if turns from y axis to whiskers have different signs - - xmin = 0 - xmax = 0 - ymin = 0 - ymax = 0 - - def xadd(val): # extend either xmin or xmax if outside current range - nonlocal xmin - if val < xmin: - xmin = val - nonlocal xmax - if xmax < val: - xmax = val - - def yadd(val): # extend either ymin or ymax if outside current range - nonlocal ymin - if val < ymin: - ymin = val - nonlocal ymax - if ymax < val: - ymax = val - - # consider the x coordinates of the whisker ends - xadd(leftWhisker[0] * radius) - xadd(rightWhisker[0] * radius) - if fovOverXAxis: # if over x axis, x range is maximized to radius either left or right - xadd(radius * -yts) # left or right is determined by the negated sign of the turn from the positive y axis to the look direction - if over180: # if also over 180, then y range is maximized to radius in the direction closer to the look direction - yadd(radius * xts) - if not fovOverYAxis: # if over x axis, over 180, and not over y axis, y range is maximized in both directions - yadd(radius * -xts) - - # consider the y coordinates of the whisker ends - yadd(leftWhisker[1] * radius) - yadd(rightWhisker[1] * radius) - if fovOverYAxis: # if over y axis, y range is maximized to radius either up or down - yadd(radius * xts) # up or down is determined by the sign of the turn from the positive x axis to the look direction - if over180: # if also over 180, then x range is maximized to radius in the direction closer to the look direction - xadd(radius * -yts) - if not fovOverXAxis: # if over y axis, over 180, and not over x axis, x range is maximized in both directions - xadd(radius * yts) + position = self.position # this padding of the rectangle is to account for and detect agents that would only be seen by the whisker circle intercept correction padding = 0 if self.detect_only_origins else world.max_agent_r + lower = angle + span + upper = angle - span + angles = [lower, upper] + wpoints = vectorize(angles).T + + cpoints = axes_in_fov(lower, upper) # add points to include the arc in the AABB + + points = np.vstack((wpoints, cpoints, zeros2)) + points *= radius + mins = points.min(axis=0) + position - padding + maxs = points.max(axis=0) + position + padding + # positions are relative until now, make them absolute for the return - return [position[0] + xmin - padding, position[1] + ymin - padding, position[0] + xmax + padding, position[1] + ymax + padding] + return [*mins, *maxs] # xmin, ymin, xmax, ymax def check_goals(self, world): @@ -416,7 +387,7 @@ def draw(self, screen, offset=((0, 0), 1.0)): AARtl = np.array(AAR[:2]) * zoom + pan AARbr = np.array(AAR[2:]) * zoom + pan pygame.draw.rect(screen, sight_color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) - detected = [point.data for point in self.agent.world.quad.within_bb(quads.BoundingBox(*AAR))] + detected = self.agents_within_range for agent in detected: pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) diff --git a/src/swarmsim/sensors/RangedSensor.py b/src/swarmsim/sensors/RangedSensor.py new file mode 100644 index 0000000..79f39c0 --- /dev/null +++ b/src/swarmsim/sensors/RangedSensor.py @@ -0,0 +1,116 @@ +from swarmsim.world.RectangularWorld import RectangularWorld +import pygame +import numpy as np +import math +from .AbstractSensor import AbstractSensor +from typing import List +from ..util.collider.AABB import AABB + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from ..world.World import World +else: + World = None + +import warnings +import quads + + +class RangedSensor(AbstractSensor): + config_vars = AbstractSensor.config_vars + [ + 'distance', 'time_step_between_sensing', 'detect_only_origins', + 'show', 'target_team' + ] + + DEBUG = False + + def __init__( + self, + agent=None, + parent=None, + + distance=100.0, + time_step_between_sensing=1, + detect_only_origins=False, + target_team=None, + **kwargs + ): + super().__init__(agent=agent, parent=parent, **kwargs) + self.time_step_between_sensing = time_step_between_sensing + self.time_since_last_sensing = 0 + + self.detect_only_origins = detect_only_origins + self.target_team = target_team + self.agents_within_range = [] + self.r = distance + + def ready_to_sense(self): + # Our sensing rate occurs less frequently than our dt physics update, so we need to + # only check for LOS collisions every n timesteps. + self.time_since_last_sensing += 1 + if self.time_since_last_sensing % self.time_step_between_sensing == 0: + self.time_since_last_sensing = 0 + return True + return False + + def make_bounding_box(self): + mins = self.agent.pos - self.r + maxs = self.agent.pos + self.r + return *mins, *maxs + + def get_agents_within_range(self, world: RectangularWorld) -> None: + + infinite_range = self.r == float('inf') + + if world.quad and not infinite_range: + # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector + quadpoints = [point.data for point in world.quad.within_bb( + quads.BoundingBox(*self.make_bounding_box()))] + agents = np.array([agent for data in quadpoints for agent in data], dtype=object) + else: + agents = np.array(world.population, dtype=object) + # filter agents to those within the sensing radius + if infinite_range: + bag = agents + elif agents.size: # if any agents found + positions = np.array([agent.pos for agent in agents]) + distances = np.linalg.norm(positions - self.position, axis=1) + is_close = distances < self.r + bag = agents[is_close] + else: + bag = [] + + if self.target_team: + bag = [agent for agent in bag if agent.team == self.target_team] + else: + bag = agents.tolist() + + self.agents_within_range = bag + return bag + + def draw(self, screen, offset=((0, 0), 1.0)): + super().draw(screen, offset) + pan, zoom = np.asarray(offset[0]), np.asarray(offset[1]) + zoom: float + if self.show: + # Draw Sensory Vector (Vision Vector) + color = (150, 150, 150) + spos = self.position * zoom + pan + + # draw the whiskers + # length = actual sensor range if agent is selected/highlighted, otherwise draw relative to agent radius + if self.agent.is_highlighted: + width = max(1, round(0.01 * zoom)) + pygame.draw.circle(screen, color + (50,), spos, self.r * zoom, width) + # draw the arc of the sensor cone + + if not self.DEBUG: + return + # DEBUG DRAWINGS: + AAR = self.make_bounding_box() * zoom + pan + AARtl = AAR[:2] + AARbr = AAR[2:] + pygame.draw.rect(screen, color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) + detected = self.agents_within_range + for agent in detected: + pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) From 893554ae95a4fe940e70411ccb335b55ffa68f3a Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Sun, 21 Sep 2025 23:10:38 -0400 Subject: [PATCH 3/6] wip relative position sensor (cherry picked from commit 279bab02a3c388833df6dd82f57f240912d7207e) --- src/swarmsim/sensors/RelativeAgentSensor.py | 223 +++----------------- 1 file changed, 26 insertions(+), 197 deletions(-) diff --git a/src/swarmsim/sensors/RelativeAgentSensor.py b/src/swarmsim/sensors/RelativeAgentSensor.py index 03b2109..19e1445 100644 --- a/src/swarmsim/sensors/RelativeAgentSensor.py +++ b/src/swarmsim/sensors/RelativeAgentSensor.py @@ -2,9 +2,8 @@ import pygame import numpy as np import math -from .AbstractSensor import AbstractSensor +from .RangedSensor import RangedSensor from typing import List -from ..world.goals.Goal import CylinderGoal from ..util.collider.AABB import AABB from typing import TYPE_CHECKING @@ -17,13 +16,7 @@ import quads -class BinaryFOVSensor(AbstractSensor): - config_vars = AbstractSensor.config_vars + [ - 'theta', 'distance', 'bias', 'false_positive', 'false_negative', - 'walls', 'wall_sensing_range', 'time_step_between_sensing', 'invert', - 'store_history', 'detect_goal_with_added_state', 'show', 'target_team' - ] - +class RelativeAgentSensor(RangedSensor): DEBUG = False def __init__( @@ -32,17 +25,24 @@ def __init__( parent=None, distance=100.0, time_step_between_sensing=1, - store_history=False, show=True, - seed=None, target_team=None, + **kwargs ): - super().__init__(agent=agent, parent=parent, seed=seed, **kwargs) + super().__init__( + agent=agent, + parent=parent, + distance=distance, + time_step_between_sensing=time_step_between_sensing, + detect_only_origins=True, + target_team=target_team, + **kwargs + ) self.time_step_between_sensing = time_step_between_sensing self.time_since_last_sensing = 0 self.history = [] - self.store_history = store_history + self.store_history = False self.show = show self.target_team = target_team @@ -50,199 +50,28 @@ def __init__( self.r = distance - def checkForLOSCollisions(self, world: RectangularWorld) -> None: - # Mathematics obtained from Sundaram Ramaswamy - # https://legends2k.github.io/2d-fov/design.html - # See section 3.1.1.2 - self.time_since_last_sensing += 1 - if self.time_since_last_sensing % self.time_step_between_sensing != 0: - # Our sensing rate occurs less frequently than our dt physics update, so we need to - # only check for LOS collisions every n timesteps. - return - - self.time_since_last_sensing = 0 - - sensor_origin = self.agent.pos - - if world.quad: - # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector - quadpoints = [point.data for point in world.quad.within_bb( - quads.BoundingBox(*self.getAARectContainingSector(world)))] - agents = np.array([agent for data in quadpoints for agent in data], dtype=object) - else: - agents = np.array(world.population, dtype=object) - # filter agents to those within the sensing radius - if agents.size: # if any agents found - positions = np.array([agent.pos for agent in agents]) - distances = np.linalg.norm(positions - sensor_origin, axis=1) - is_close = distances < self.r - bag = agents[is_close] - else: - bag = [] - - detected = [] - - for other in bag: - if other is self.agent: # skip the agent the sensor is attached to - continue - - if self.target_team and not other.team == self.target_team: - continue - - u = other.pos - sensor_origin # vector to agent - - - # if an agent was in the fov then this function would have returned, so determine the sensing state to be false - self.determineState(False, None, world) - return - - def step(self, world, only_check_goals=False): - super(BinaryFOVSensor, self).step(world=world) - self.checkForLOSCollisions(world=world) - if self.store_history: - if self.agent.agent_in_sight: - self.history.append(int(self.agent.agent_in_sight.name)) - else: - self.history.append(-1) - - def draw(self, screen, offset=((0, 0), 1.0)): - super(BinaryFOVSensor, self).draw(screen, offset) - pan, zoom = np.asarray(offset[0]), np.asarray(offset[1]) - zoom: float - if self.show: - # Draw Sensory Vector (Vision Vector) - sight_color = (255, 0, 0) - if self.current_state == 1: - sight_color = (0, 255, 0) - if self.current_state == 2: - sight_color = (255, 255, 0) - - # draw the whiskers - # length = actual sensor range if agent is selected/highlighted, otherwise draw relative to agent radius - magnitude = self.r if self.agent.is_highlighted else self.agent.radius * 5 - - head = np.asarray(self.agent.getPosition()) * zoom + pan - e_left, e_right = self.getSectorVectors() - e_left, e_right = np.asarray(e_left[:2]), np.asarray(e_right[:2]) - - tail_l = head + magnitude * e_left * zoom - tail_r = head + magnitude * e_right * zoom - - pygame.draw.line(screen, sight_color, head, tail_l) - pygame.draw.line(screen, sight_color, head, tail_r) - if self.agent.is_highlighted: - width = max(1, round(0.01 * zoom)) - # pygame.draw.circle(screen, sight_color + (50,), head, self.r * zoom, width) - # draw the arc of the sensor cone - range_bbox = AABB.from_center_wh(head, self.r * 2 * zoom) - langle = self.agent.angle + self.angle + self.theta - rangle = self.agent.angle + self.angle - self.theta - pygame.draw.arc(screen, sight_color + (50,), range_bbox.to_rect(), -langle, -rangle, width) - - if not self.DEBUG: - return - # DEBUG DRAWINGS: - if self.wall_sensing_range: - pygame.draw.circle(screen, (150, 150, 150, 50), head, self.wall_sensing_range * zoom, width) - AAR = self.getAARectContainingSector(self.agent.world) - AARtl = np.array(AAR[:2]) * zoom + pan - AARbr = np.array(AAR[2:]) * zoom + pan - pygame.draw.rect(screen, sight_color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) - detected = [point.data for point in self.agent.world.quad.within_bb(quads.BoundingBox(*AAR))] - for agent in detected: - pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) - - # this function has been replaced by a more efficient procedure in checkForLOSCollisions and is no longer called there - def circle_interesect_sensing_cone(self, u, r): - e_left, e_right = self.getSectorVectors() - directional = np.dot(u, self.getBiasedSightAngle()) - if directional > 0: - u = np.append(u, [0]) - cross_l = np.cross(e_left, u) - cross_r = np.cross(u, e_right) - sign_l = np.sign(cross_l) - sign_r = np.sign(cross_r) - added_signs = sign_l - sign_r - sector_boundaries = np.all(added_signs == 0) - if sector_boundaries: - d_to_inter = np.linalg.norm(u) - return d_to_inter - - # It may also be the case that the center of the agent is not within the FOV, but that some part of the - # circle is visible and on the edges of the left and right viewing vectors. - # LinAlg Calculations obtained from https://www.bluebill.net/circle_ray_intersection.html - - # u, defined earlier is the vector from the point of interest to the center of the circle - # Project u onto e_left and e_right - u_l = np.dot(u, e_left) * e_left - u_r = np.dot(u, e_right) * e_right - - # Determine the minimum distance between the agent's center (center of circle) and the projected vector - dist_l = np.linalg.norm(u - u_l) - dist_r = np.linalg.norm(u - u_r) - - radius = r # Note: Assumes homogenous radius - if dist_l < radius: - d_to_inter = np.linalg.norm(u) - return d_to_inter - if dist_r < radius: - d_to_inter = np.linalg.norm(u) - return d_to_inter - return None - - def withinRadiusExclusiveFast(self, origin, other, radius): - diff = origin - other - return np.dot(diff, diff) < radius**2 - - def getLOSVector(self) -> List: - if self.angle is None: - return self.agent.orientation_uvec() - - return [ - math.cos(self.angle + self.agent.angle), - math.sin(self.angle + self.agent.angle) - ] - - def getBiasedSightAngle(self): - angle: float = self.agent.angle + self.bias - return vectorize(angle) - - def getSectorVectors(self): - angle: float = self.agent.angle + self.bias - span: float = self.theta + def get_positions(self, world: RectangularWorld) -> None: + bag = self.get_agents_within_range(world) + positions = np.array([agent.pos for agent in bag]) + return positions - leftBorder = vectorize(angle + span) - rightBorder = vectorize(angle - span) - return np.append(leftBorder, 0), np.append(rightBorder, 0) + def step(self, world, **kwargs): + self.current_state = self.get_positions(world) def as_config_dict(self): return { - "type": "BinaryFOVSensor", - "theta": self.theta, - "bias": self.bias, - "fp": self.fp, - "fn": self.fn, + "type": type(self).__name__, "time_step_between_sensing": self.time_step_between_sensing, "store_history": self.store_history, - "use_goal_state": self.use_goal_state, - "wall_sensing_range": self.wall_sensing_range, - "goal_sensing_range": self.goal_sensing_range, "agent_sensing_range": self.r, - "seed": self.seed, + "target_team": self.target_team, } - @staticmethod - def from_dict(d): - return BinaryFOVSensor( + @classmethod + def from_dict(cls, d): + return cls( parent=None, - theta=d["theta"], distance=d["agent_sensing_range"], - bias=d["bias"], - false_positive=d.get("fp", 0.0), - false_negative=d.get("fn", 0.0), - store_history=d["store_history"], - detect_goal_with_added_state=d["use_goal_state"], - wall_sensing_range=d["wall_sensing_range"], - goal_sensing_range=d["goal_sensing_range"], - seed=d["seed"] if "seed" in d else None, + time_step_between_sensing=d.get("time_step_between_sensing", 1), + target_team=d.get("target_team", None), ) From d370b3c99b914828e83a315b194570886a7afdb5 Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Tue, 23 Sep 2025 15:13:21 -0400 Subject: [PATCH 4/6] add to metrics (cherry picked from commit 35be6e3e212c6c927b6b0e09035f9469b5816262) --- src/swarmsim/config/__init__.py | 2 ++ src/swarmsim/sensors/RelativeAgentSensor.py | 1 + 2 files changed, 3 insertions(+) diff --git a/src/swarmsim/config/__init__.py b/src/swarmsim/config/__init__.py index 090eae7..8f0e2dd 100644 --- a/src/swarmsim/config/__init__.py +++ b/src/swarmsim/config/__init__.py @@ -187,6 +187,7 @@ def add_native_sensors(self): from ..sensors.GenomeDependentSensor import GenomeBinarySensor from ..sensors.RegionalSensor import RegionalSensor from ..sensors.StaticSensor import StaticSensor + from ..sensors.RelativeAgentSensor import RelativeAgentSensor self.add_dictlike_namespace('sensors') @@ -195,6 +196,7 @@ def add_native_sensors(self): self._dictlike_types['sensors']['GenomeBinarySensor'] = GenomeBinarySensor self._dictlike_types['sensors']['RegionalSensor'] = RegionalSensor self._dictlike_types['sensors']['StaticSensor'] = StaticSensor + self._dictlike_types['sensors']['RelativeAgentSensor'] = RelativeAgentSensor def add_native_controllers(self): from ..agent.control.Controller import Controller diff --git a/src/swarmsim/sensors/RelativeAgentSensor.py b/src/swarmsim/sensors/RelativeAgentSensor.py index 19e1445..5bc5e6e 100644 --- a/src/swarmsim/sensors/RelativeAgentSensor.py +++ b/src/swarmsim/sensors/RelativeAgentSensor.py @@ -45,6 +45,7 @@ def __init__( self.store_history = False self.show = show self.target_team = target_team + self.current_state = [] self.detect_only_origins = False From 8280fad138a975f743cfad96e208723e49c27882 Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Tue, 23 Sep 2025 14:04:29 -0400 Subject: [PATCH 5/6] fix debug drawing (cherry picked from commit ccaeb920e198be53381ea2ba93f121a1f550b87d) --- src/swarmsim/sensors/RangedSensor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/swarmsim/sensors/RangedSensor.py b/src/swarmsim/sensors/RangedSensor.py index 79f39c0..51f89f3 100644 --- a/src/swarmsim/sensors/RangedSensor.py +++ b/src/swarmsim/sensors/RangedSensor.py @@ -107,9 +107,9 @@ def draw(self, screen, offset=((0, 0), 1.0)): if not self.DEBUG: return # DEBUG DRAWINGS: - AAR = self.make_bounding_box() * zoom + pan - AARtl = AAR[:2] - AARbr = AAR[2:] + AAR = self.make_bounding_box() * zoom + AARtl = AAR[:2] + pan + AARbr = AAR[2:] + pan pygame.draw.rect(screen, color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) detected = self.agents_within_range for agent in detected: From 762542acb33b8f89071753f7dadae5ca5e568794 Mon Sep 17 00:00:00 2001 From: kenblu24 Date: Tue, 23 Sep 2025 14:26:30 -0400 Subject: [PATCH 6/6] fix broken sensor ranging (cherry picked from commit 0bc7dc1d9ce96f4f058ab4512cc4f181d905648f) --- src/swarmsim/sensors/RangedSensor.py | 2 +- tests/sensors/test_sensors.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/swarmsim/sensors/RangedSensor.py b/src/swarmsim/sensors/RangedSensor.py index 51f89f3..c2ab648 100644 --- a/src/swarmsim/sensors/RangedSensor.py +++ b/src/swarmsim/sensors/RangedSensor.py @@ -83,7 +83,7 @@ def get_agents_within_range(self, world: RectangularWorld) -> None: if self.target_team: bag = [agent for agent in bag if agent.team == self.target_team] else: - bag = agents.tolist() + bag = [agent for agent in bag] self.agents_within_range = bag return bag diff --git a/tests/sensors/test_sensors.py b/tests/sensors/test_sensors.py index ab796ee..c65a9d0 100644 --- a/tests/sensors/test_sensors.py +++ b/tests/sensors/test_sensors.py @@ -16,7 +16,7 @@ def stop_after_one_step(world: RectangularWorld) -> bool: def setup_common_world(world_setup: dict) -> RectangularWorld: world_config = RectangularWorldConfig(**world_setup) world = RectangularWorld(world_config) - main(world, show_gui=False, stop_detection=stop_after_one_step) + main(world, show_gui=False, stop_detection=stop_after_one_step, start_paused=False) # > Did world step only once? assert world.total_steps == 1