diff --git a/src/swarmsim/config/__init__.py b/src/swarmsim/config/__init__.py index 090eae7f..8f0e2dd4 100644 --- a/src/swarmsim/config/__init__.py +++ b/src/swarmsim/config/__init__.py @@ -187,6 +187,7 @@ def add_native_sensors(self): from ..sensors.GenomeDependentSensor import GenomeBinarySensor from ..sensors.RegionalSensor import RegionalSensor from ..sensors.StaticSensor import StaticSensor + from ..sensors.RelativeAgentSensor import RelativeAgentSensor self.add_dictlike_namespace('sensors') @@ -195,6 +196,7 @@ def add_native_sensors(self): self._dictlike_types['sensors']['GenomeBinarySensor'] = GenomeBinarySensor self._dictlike_types['sensors']['RegionalSensor'] = RegionalSensor self._dictlike_types['sensors']['StaticSensor'] = StaticSensor + self._dictlike_types['sensors']['RelativeAgentSensor'] = RelativeAgentSensor def add_native_controllers(self): from ..agent.control.Controller import Controller diff --git a/src/swarmsim/sensors/AbstractSensor.py b/src/swarmsim/sensors/AbstractSensor.py index 515aaa9a..57164a17 100644 --- a/src/swarmsim/sensors/AbstractSensor.py +++ b/src/swarmsim/sensors/AbstractSensor.py @@ -13,8 +13,16 @@ class AbstractSensor: config_vars = ['static_position', 'n_possible_states', 'show'] - def __init__(self, agent=None, parent=None, static_position=None, n_possible_states=0, - draw=True, seed=None, **kwargs): + def __init__( + self, + agent=None, + parent=None, + static_position=None, + n_possible_states=0, + draw=True, + seed=None, + **kwargs + ): """Sensor class for the agent. Sensors should typically have a parent that is assigned to them that must be of subclass 'Agent' diff --git a/src/swarmsim/sensors/BinaryFOVSensor.py b/src/swarmsim/sensors/BinaryFOVSensor.py index 6c97f184..46974715 100644 --- a/src/swarmsim/sensors/BinaryFOVSensor.py +++ b/src/swarmsim/sensors/BinaryFOVSensor.py @@ -2,7 +2,7 @@ import pygame import numpy as np import math -from .AbstractSensor import AbstractSensor +from .RangedSensor import RangedSensor from typing import List from ..world.goals.Goal import CylinderGoal from ..util.collider.AABB import AABB @@ -19,13 +19,18 @@ # convert an angle to a representative unit vector def vectorize(angle): - return np.array((np.cos(angle), np.sin(angle))) + return np.asarray((np.cos(angle), np.sin(angle))) # compute the vector turn value from origin→p1 to origin→p2 # this value is positive if a left turn is the fastest way to go from p1 to p2, zero if p1 and p2 are colinear, and negative otherwise def turn(p1, p2): - return p1[0] * p2[1] - p2[0] * p1[1] + return np.linalg.det(np.vstack((p1[:2], p2[:2]))) + + +def rad02pi(angle: float | np.ndarray) -> np.ndarray | float: + x = np.fmod(angle, 2 * np.pi) + return np.where(x < 0, 2 * np.pi + x, x) # project vector a onto vector b @@ -39,14 +44,32 @@ def lineCircleIntersect(line, center, radius): return np.dot(clDiffVec, clDiffVec) <= radius**2 -class BinaryFOVSensor(AbstractSensor): - config_vars = AbstractSensor.config_vars + [ - 'theta', 'distance', 'bias', 'false_positive', 'false_negative', +PIC = 2 * np.pi +axes_angles = np.array([0, np.pi / 2, np.pi, 3 * np.pi / 2]) +axes_points = np.array([ + [1, 0], + [0, 1], + [-1, 0], + [0, -1], +]) +zeros2 = np.zeros(2) + + +def axes_in_fov(lower, upper): + # used to help expand the AABB to include the arc of the sensor cone + # explanation: https://stackoverflow.com/questions/66799475/how-to-elegantly-find-if-an-angle-is-between-a-range + mask = np.remainder(axes_angles - lower, PIC) > np.remainder(upper - lower, PIC) + return axes_points[mask] + + +class BinaryFOVSensor(RangedSensor): + config_vars = RangedSensor.config_vars + [ + 'theta', 'bias', 'false_positive', 'false_negative', 'walls', 'wall_sensing_range', 'time_step_between_sensing', 'invert', 'store_history', 'detect_goal_with_added_state', 'show', 'target_team' ] - DEBUG = False + DEBUG = True def __init__( self, @@ -64,12 +87,22 @@ def __init__( invert=False, store_history=False, detect_goal_with_added_state=False, + detect_only_origins=False, show=True, seed=None, target_team=None, **kwargs ): - super().__init__(agent=agent, parent=parent, seed=seed, **kwargs) + super().__init__( + agent=agent, + parent=parent, + distance=distance, + time_step_between_sensing=time_step_between_sensing, + detect_only_origins=detect_only_origins, + target_team=target_team, + seed=seed, + **kwargs + ) self.angle = 0.0 self.theta = theta self.bias = bias @@ -89,7 +122,7 @@ def __init__( self.detection_id = 0 self.target_team = target_team - self.detect_only_origins = False + self.detect_only_origins = detect_only_origins NOTFOUND = object() if (degrees := kwargs.pop('degrees', NOTFOUND)) is not NOTFOUND: @@ -103,31 +136,8 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: # Mathematics obtained from Sundaram Ramaswamy # https://legends2k.github.io/2d-fov/design.html # See section 3.1.1.2 - self.time_since_last_sensing += 1 - if self.time_since_last_sensing % self.time_step_between_sensing != 0: - # Our sensing rate occurs less frequently than our dt physics update, so we need to - # only check for LOS collisions every n timesteps. - return - - self.time_since_last_sensing = 0 - - sensor_origin = self.agent.pos - if world.quad: - # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector - quadpoints = [point.data for point in world.quad.within_bb( - quads.BoundingBox(*self.getAARectContainingSector(world)))] - agents = np.array([agent for data in quadpoints for agent in data], dtype=object) - else: - agents = np.array(world.population, dtype=object) - # filter agents to those within the sensing radius - if agents.size: - positions = np.array([agent.pos for agent in agents]) - distances = np.linalg.norm(positions - sensor_origin, axis=1) - is_close = distances < self.r - bag = agents[is_close] - else: - bag = [] + bag = self.get_agents_within_range(world) # get left and right whiskers e_left, e_right = self.getSectorVectors() @@ -136,10 +146,11 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: # TODO: Rewrite all this to use WorldObjects # see https://www.geeksforgeeks.org/check-if-two-given-line-segments-intersect/ consideration_set = [] + origin = self.position if self.walls is not None: # Get e_left, e_right line_segments - l = [sensor_origin, sensor_origin + (e_left[:2] * self.wall_sensing_range)] # noqa: E741 - r = [sensor_origin, sensor_origin + (e_right[:2] * self.wall_sensing_range)] + l = [origin, origin + (e_left[:2] * self.wall_sensing_range)] # noqa: E741 + r = [origin, origin + (e_right[:2] * self.wall_sensing_range)] wall_top = [self.walls[0], [self.walls[1][0], self.walls[0][1]]] wall_right = [[self.walls[1][0], self.walls[0][1]], self.walls[1]] wall_bottom = [self.walls[1], [self.walls[0][0], self.walls[1][1]]] @@ -150,7 +161,7 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: for line in [l, r]: if self.lines_segments_intersect(line, wall): d_to_inter = np.linalg.norm( - np.array(self.line_seg_int_point(line, wall)) - np.array(sensor_origin)) + np.array(self.line_seg_int_point(line, wall)) - np.array(origin)) consideration_set.append((d_to_inter, None)) # Detect for World Objects @@ -180,7 +191,7 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: if self.target_team and not agent.team == self.target_team: continue - u = agent.getPosition() - sensor_origin # vector to agent + u = agent.getPosition() - origin # vector to agent leftTurn = turn(u, e_left) rightTurn = turn(u, e_right) @@ -208,73 +219,33 @@ def checkForLOSCollisions(self, world: RectangularWorld) -> None: self.determineState(False, None, world) return + def make_bounding_box(self): + return self.getAARectContainingSector(self.agent.world) # get the smallest rectangle that contains the sensor fov sector + # get the smallest rectangle that contains the sensor fov sector def getAARectContainingSector(self, world: RectangularWorld): angle: float = self.agent.angle + self.bias # global sensor angle span: float = self.theta # angle fov sweeps to either side radius: float = self.r # view radius - position: list[float] = self.agent.pos.tolist() # agent global position - - over180 = np.pi <= span * 2 # true if 180 <= FOV - - center = vectorize(angle) # vector representing absolute look direction - leftWhisker = vectorize(angle + span) # vector representing left whisker - rightWhisker = vectorize(angle - span) # vector representing right whisker - xaxis = (1, 0) # vector representing positive x axis - yaxis = (0, 1) # vector representing positive y axis - - xts = np.sign(turn(xaxis, center)) # sign of turn from x axis to look direction - fovOverXAxis = np.sign(turn(xaxis, leftWhisker)) != np.sign(turn(xaxis, rightWhisker)) # true if turns from x axis to whiskers have different signs - - yts = np.sign(turn(yaxis, center)) # sign of turn from y axis to look direction - fovOverYAxis = np.sign(turn(yaxis, leftWhisker)) != np.sign(turn(yaxis, rightWhisker)) # true if turns from y axis to whiskers have different signs - - xmin = 0 - xmax = 0 - ymin = 0 - ymax = 0 - - def xadd(val): # extend either xmin or xmax if outside current range - nonlocal xmin - if val < xmin: - xmin = val - nonlocal xmax - if xmax < val: - xmax = val - - def yadd(val): # extend either ymin or ymax if outside current range - nonlocal ymin - if val < ymin: - ymin = val - nonlocal ymax - if ymax < val: - ymax = val - - # consider the x coordinates of the whisker ends - xadd(leftWhisker[0] * radius) - xadd(rightWhisker[0] * radius) - if fovOverXAxis: # if over x axis, x range is maximized to radius either left or right - xadd(radius * -yts) # left or right is determined by the negated sign of the turn from the positive y axis to the look direction - if over180: # if also over 180, then y range is maximized to radius in the direction closer to the look direction - yadd(radius * xts) - if not fovOverYAxis: # if over x axis, over 180, and not over y axis, y range is maximized in both directions - yadd(radius * -xts) - - # consider the y coordinates of the whisker ends - yadd(leftWhisker[1] * radius) - yadd(rightWhisker[1] * radius) - if fovOverYAxis: # if over y axis, y range is maximized to radius either up or down - yadd(radius * xts) # up or down is determined by the sign of the turn from the positive x axis to the look direction - if over180: # if also over 180, then x range is maximized to radius in the direction closer to the look direction - xadd(radius * -yts) - if not fovOverXAxis: # if over y axis, over 180, and not over x axis, x range is maximized in both directions - xadd(radius * yts) + position = self.position # this padding of the rectangle is to account for and detect agents that would only be seen by the whisker circle intercept correction padding = 0 if self.detect_only_origins else world.max_agent_r + lower = angle + span + upper = angle - span + angles = [lower, upper] + wpoints = vectorize(angles).T + + cpoints = axes_in_fov(lower, upper) # add points to include the arc in the AABB + + points = np.vstack((wpoints, cpoints, zeros2)) + points *= radius + mins = points.min(axis=0) + position - padding + maxs = points.max(axis=0) + position + padding + # positions are relative until now, make them absolute for the return - return [position[0] + xmin - padding, position[1] + ymin - padding, position[0] + xmax + padding, position[1] + ymax + padding] + return [*mins, *maxs] # xmin, ymin, xmax, ymax def check_goals(self, world): @@ -416,7 +387,7 @@ def draw(self, screen, offset=((0, 0), 1.0)): AARtl = np.array(AAR[:2]) * zoom + pan AARbr = np.array(AAR[2:]) * zoom + pan pygame.draw.rect(screen, sight_color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) - detected = [point.data for point in self.agent.world.quad.within_bb(quads.BoundingBox(*AAR))] + detected = self.agents_within_range for agent in detected: pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) diff --git a/src/swarmsim/sensors/RangedSensor.py b/src/swarmsim/sensors/RangedSensor.py new file mode 100644 index 00000000..c2ab6481 --- /dev/null +++ b/src/swarmsim/sensors/RangedSensor.py @@ -0,0 +1,116 @@ +from swarmsim.world.RectangularWorld import RectangularWorld +import pygame +import numpy as np +import math +from .AbstractSensor import AbstractSensor +from typing import List +from ..util.collider.AABB import AABB + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from ..world.World import World +else: + World = None + +import warnings +import quads + + +class RangedSensor(AbstractSensor): + config_vars = AbstractSensor.config_vars + [ + 'distance', 'time_step_between_sensing', 'detect_only_origins', + 'show', 'target_team' + ] + + DEBUG = False + + def __init__( + self, + agent=None, + parent=None, + + distance=100.0, + time_step_between_sensing=1, + detect_only_origins=False, + target_team=None, + **kwargs + ): + super().__init__(agent=agent, parent=parent, **kwargs) + self.time_step_between_sensing = time_step_between_sensing + self.time_since_last_sensing = 0 + + self.detect_only_origins = detect_only_origins + self.target_team = target_team + self.agents_within_range = [] + self.r = distance + + def ready_to_sense(self): + # Our sensing rate occurs less frequently than our dt physics update, so we need to + # only check for LOS collisions every n timesteps. + self.time_since_last_sensing += 1 + if self.time_since_last_sensing % self.time_step_between_sensing == 0: + self.time_since_last_sensing = 0 + return True + return False + + def make_bounding_box(self): + mins = self.agent.pos - self.r + maxs = self.agent.pos + self.r + return *mins, *maxs + + def get_agents_within_range(self, world: RectangularWorld) -> None: + + infinite_range = self.r == float('inf') + + if world.quad and not infinite_range: + # use world.quad that tracks agent positions to retrieve the agents within the minimal rectangle that contains the FOV sector + quadpoints = [point.data for point in world.quad.within_bb( + quads.BoundingBox(*self.make_bounding_box()))] + agents = np.array([agent for data in quadpoints for agent in data], dtype=object) + else: + agents = np.array(world.population, dtype=object) + # filter agents to those within the sensing radius + if infinite_range: + bag = agents + elif agents.size: # if any agents found + positions = np.array([agent.pos for agent in agents]) + distances = np.linalg.norm(positions - self.position, axis=1) + is_close = distances < self.r + bag = agents[is_close] + else: + bag = [] + + if self.target_team: + bag = [agent for agent in bag if agent.team == self.target_team] + else: + bag = [agent for agent in bag] + + self.agents_within_range = bag + return bag + + def draw(self, screen, offset=((0, 0), 1.0)): + super().draw(screen, offset) + pan, zoom = np.asarray(offset[0]), np.asarray(offset[1]) + zoom: float + if self.show: + # Draw Sensory Vector (Vision Vector) + color = (150, 150, 150) + spos = self.position * zoom + pan + + # draw the whiskers + # length = actual sensor range if agent is selected/highlighted, otherwise draw relative to agent radius + if self.agent.is_highlighted: + width = max(1, round(0.01 * zoom)) + pygame.draw.circle(screen, color + (50,), spos, self.r * zoom, width) + # draw the arc of the sensor cone + + if not self.DEBUG: + return + # DEBUG DRAWINGS: + AAR = self.make_bounding_box() * zoom + AARtl = AAR[:2] + pan + AARbr = AAR[2:] + pan + pygame.draw.rect(screen, color + (50,), pygame.Rect(*AARtl, *(AARbr - AARtl)), width) + detected = self.agents_within_range + for agent in detected: + pygame.draw.circle(screen, pygame.colordict.THECOLORS["blue"], agent.pos * zoom + pan, agent.radius * zoom, width * 3) diff --git a/src/swarmsim/sensors/RelativeAgentSensor.py b/src/swarmsim/sensors/RelativeAgentSensor.py new file mode 100644 index 00000000..5bc5e6e4 --- /dev/null +++ b/src/swarmsim/sensors/RelativeAgentSensor.py @@ -0,0 +1,78 @@ +from swarmsim.world.RectangularWorld import RectangularWorld +import pygame +import numpy as np +import math +from .RangedSensor import RangedSensor +from typing import List +from ..util.collider.AABB import AABB + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from ..world.World import World +else: + World = None + +import warnings +import quads + + +class RelativeAgentSensor(RangedSensor): + DEBUG = False + + def __init__( + self, + agent=None, + parent=None, + distance=100.0, + time_step_between_sensing=1, + show=True, + target_team=None, + + **kwargs + ): + super().__init__( + agent=agent, + parent=parent, + distance=distance, + time_step_between_sensing=time_step_between_sensing, + detect_only_origins=True, + target_team=target_team, + **kwargs + ) + self.time_step_between_sensing = time_step_between_sensing + self.time_since_last_sensing = 0 + self.history = [] + self.store_history = False + self.show = show + self.target_team = target_team + self.current_state = [] + + self.detect_only_origins = False + + self.r = distance + + def get_positions(self, world: RectangularWorld) -> None: + bag = self.get_agents_within_range(world) + positions = np.array([agent.pos for agent in bag]) + return positions + + def step(self, world, **kwargs): + self.current_state = self.get_positions(world) + + def as_config_dict(self): + return { + "type": type(self).__name__, + "time_step_between_sensing": self.time_step_between_sensing, + "store_history": self.store_history, + "agent_sensing_range": self.r, + "target_team": self.target_team, + } + + @classmethod + def from_dict(cls, d): + return cls( + parent=None, + distance=d["agent_sensing_range"], + time_step_between_sensing=d.get("time_step_between_sensing", 1), + target_team=d.get("target_team", None), + ) diff --git a/tests/sensors/test_sensors.py b/tests/sensors/test_sensors.py index ab796ee9..c65a9d05 100644 --- a/tests/sensors/test_sensors.py +++ b/tests/sensors/test_sensors.py @@ -16,7 +16,7 @@ def stop_after_one_step(world: RectangularWorld) -> bool: def setup_common_world(world_setup: dict) -> RectangularWorld: world_config = RectangularWorldConfig(**world_setup) world = RectangularWorld(world_config) - main(world, show_gui=False, stop_detection=stop_after_one_step) + main(world, show_gui=False, stop_detection=stop_after_one_step, start_paused=False) # > Did world step only once? assert world.total_steps == 1