diff --git a/.github/workflows/run_checks_build_and_test.yml b/.github/workflows/run_checks_build_and_test.yml index 89d8c20..0d47126 100644 --- a/.github/workflows/run_checks_build_and_test.yml +++ b/.github/workflows/run_checks_build_and_test.yml @@ -16,20 +16,6 @@ jobs: - uses: actions/setup-python@v5 - uses: pre-commit/action@v3.0.1 - pylint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - - name: install Pylint and plugin - run: | - python -m pip install --upgrade pip - pip install pytest pylint pylint-per-file-ignores - pip install -e . - - name: run Pylint for errors, warnings and remarks only (ignore Comments/ Code style) - run: | - pylint --disable=C test_shapefile.py src/shapefile.py - build_wheel_and_sdist: runs-on: ubuntu-latest steps: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ef3313..e12617a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,6 +19,6 @@ repos: - id: check-yaml - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.17.0 + rev: v1.17.1 hooks: - id: mypy \ No newline at end of file diff --git a/README.md b/README.md index 66951be..0c1e848 100644 --- a/README.md +++ b/README.md @@ -467,7 +467,8 @@ index which is 7. >>> s = sf.shape(7) >>> s - Shape #7: POLYGON + Polygon #7 + >>> # Read the bbox of the 8th shape to verify >>> # Round coordinates to 3 decimal places @@ -476,7 +477,7 @@ index which is 7. Each shape record (except Points) contains the following attributes. Records of shapeType Point do not have a bounding box 'bbox'. - +# TODO!! Fix attributes >>> for name in dir(shapes[3]): ... if not name.startswith('_'): diff --git a/pyproject.toml b/pyproject.toml index 7d043ae..00a755f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,6 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ - "typing_extensions", ] [project.optional-dependencies] @@ -47,6 +46,12 @@ sources = {"src" = ""} # move from "src" directory for wheel [tool.hatch.version] path = "src/shapefile.py" +[tool.pytest.ini_options] +markers = [ + "network: marks tests requiring network access", + "slow: marks other tests that cause bottlenecks", +] + [tool.ruff] # Exclude a variety of commonly ignored directories. exclude = [ @@ -111,35 +116,3 @@ skip-magic-trailing-comma = false line-ending = "auto" -[tool.pylint.MASTER] -load-plugins=[ - "pylint_per_file_ignores", -] - -# Silence warnings: src/shapefile.py:2076:20: W0212: Access to a protected member _from_geojson of a client class (protected-access) -# src/shapefile.py:950:16: W0201: Attribute 'm' defined outside __init__ (attribute-defined-outside-init) -# src/shapefile.py:973:12: W0707: Consider explicitly re-raising using 'except error as exc' and -# 'raise ShapefileException(f'Failed to write bounding box for record {i}. -# Expected floats.') from exc' (raise-missing-from) -# Silence remarks: -# src\shapefile.py:338:0: R0914: Too many local variables (21/15) (too-many-locals) -# src\shapefile.py:338:0: R0912: Too many branches (24/12) (too-many-branches) -# src\shapefile.py:338:0: R0915: Too many statements (52/50) (too-many-statements) -# src\shapefile.py:470:0: R0902: Too many instance attributes (9/7) (too-many-instance-attributes) -# src\shapefile.py:471:4: R0913: Too many arguments (6/5) (too-many-arguments) -# src\shapefile.py:471:4: R0917: Too many positional arguments (6/5) (too-many-positional-arguments) -# src\shapefile.py:506:4: R0911: Too many return statements (10/6) (too-many-return-statements) -# src\shapefile.py:878:0: R0903: Too few public methods (0/2) (too-few-public-methods) -# src\shapefile.py:1981:0: R0904: Too many public methods (23/20) (too-many-public-methods) -# src\shapefile.py:2117:17: R1732: Consider using 'with' for resource-allocating operations (consider-using-with) -# Silence warnings: test_shapefile.py:{783,786,799,803,06,1195}:19: -# W0212: Access to a protected member _offsets of a -# client class (protected-access) -# -# Toml multi-line string used instead of array due to: -# https://github.com/christopherpickering/pylint-per-file-ignores/issues/160 -[tool.pylint.'messages control'] -per-file-ignores = [ - "/src/shapefile.py:W0707,W0212,W0201,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R0915,R0917,R1732", - "test_shapefile.py:W0212,R1732", -] diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 39fbfae..0000000 --- a/pytest.ini +++ /dev/null @@ -1,4 +0,0 @@ -[pytest] -markers = - network: marks tests requiring network access - slow: marks other tests that cause bottlenecks diff --git a/src/shapefile.py b/src/shapefile.py index 2c741d0..316db0c 100644 --- a/src/shapefile.py +++ b/src/shapefile.py @@ -31,6 +31,7 @@ Iterator, Literal, NamedTuple, + NoReturn, Optional, Protocol, Reversible, @@ -45,8 +46,6 @@ from urllib.parse import urlparse, urlunparse from urllib.request import Request, urlopen -from typing_extensions import Never, NotRequired, Self, TypeIs - # Create named logger logger = logging.getLogger(__name__) @@ -77,22 +76,24 @@ MULTIPATCH = 31 SHAPETYPE_LOOKUP = { - 0: "NULL", - 1: "POINT", - 3: "POLYLINE", - 5: "POLYGON", - 8: "MULTIPOINT", - 11: "POINTZ", - 13: "POLYLINEZ", - 15: "POLYGONZ", - 18: "MULTIPOINTZ", - 21: "POINTM", - 23: "POLYLINEM", - 25: "POLYGONM", - 28: "MULTIPOINTM", - 31: "MULTIPATCH", + NULL: "NULL", + POINT: "POINT", + POLYLINE: "POLYLINE", + POLYGON: "POLYGON", + MULTIPOINT: "MULTIPOINT", + POINTZ: "POINTZ", + POLYLINEZ: "POLYLINEZ", + POLYGONZ: "POLYGONZ", + MULTIPOINTZ: "MULTIPOINTZ", + POINTM: "POINTM", + POLYLINEM: "POLYLINEM", + POLYGONM: "POLYGONM", + MULTIPOINTM: "MULTIPOINTM", + MULTIPATCH: "MULTIPATCH", } +SHAPETYPENUM_LOOKUP = {name: code for code, name in SHAPETYPE_LOOKUP.items()} + TRIANGLE_STRIP = 0 TRIANGLE_FAN = 1 OUTER_RING = 2 @@ -127,25 +128,9 @@ MBox = tuple[float, float] ZBox = tuple[float, float] -# class BBox(NamedTuple): -# xmin: float -# ymin: float -# xmax: float -# ymax: float - - -# class MBox(NamedTuple): -# mmin: Optional[float] -# mmax: Optional[float] - - -# class ZBox(NamedTuple): -# zmin: float -# zmax: float - class WriteableBinStream(Protocol): - def write(self, b: bytes): ... # pylint: disable=redefined-outer-name + def write(self, b: bytes): ... class ReadableBinStream(Protocol): @@ -153,20 +138,20 @@ def read(self, size: int = -1): ... class WriteSeekableBinStream(Protocol): - def write(self, b: bytes): ... # pylint: disable=redefined-outer-name - def seek(self, offset: int, whence: int = 0): ... # pylint: disable=unused-argument + def write(self, b: bytes): ... + def seek(self, offset: int, whence: int = 0): ... def tell(self): ... class ReadSeekableBinStream(Protocol): - def seek(self, offset: int, whence: int = 0): ... # pylint: disable=unused-argument + def seek(self, offset: int, whence: int = 0): ... def tell(self): ... def read(self, size: int = -1): ... class ReadWriteSeekableBinStream(Protocol): - def write(self, b: bytes): ... # pylint: disable=redefined-outer-name - def seek(self, offset: int, whence: int = 0): ... # pylint: disable=unused-argument + def write(self, b: bytes): ... + def seek(self, offset: int, whence: int = 0): ... def tell(self): ... def read(self, size: int = -1): ... @@ -182,8 +167,6 @@ def read(self, size: int = -1): ... class FieldType: """A bare bones 'enum', as the enum library noticeably slows performance.""" - # __slots__ = ["C", "D", "F", "L", "M", "N", "__members__"] - C: Final = "C" # "Character" # (str) D: Final = "D" # "Date" F: Final = "F" # "Floating point" @@ -197,13 +180,7 @@ class FieldType: "L", "M", "N", - } # set(__slots__) - {"__members__"} - - # def raise_if_invalid(field_type: Hashable): - # if field_type not in FieldType.__members__: - # raise ShapefileException( - # f"field_type must be in {{FieldType.__members__}}. Got: {field_type=}. " - # ) + } FIELD_TYPE_ALIASES: dict[Union[str, bytes], FieldTypeT] = {} @@ -228,7 +205,7 @@ def from_unchecked( field_type: Union[str, bytes, FieldTypeT] = "C", size: int = 50, decimal: int = 0, - ) -> Self: + ) -> Field: try: type_ = FIELD_TYPE_ALIASES[field_type] except KeyError: @@ -346,12 +323,8 @@ class GeoJSONFeatureCollection(TypedDict): class GeoJSONFeatureCollectionWithBBox(GeoJSONFeatureCollection): - # bbox is optional - # typing.NotRequired requires Python 3.11 - # and we must support 3.9 (at least until October) - # https://docs.python.org/3/library/typing.html#typing.Required - # Is there a backport? - bbox: NotRequired[list[float]] + # bbox is technically optional under the spec + bbox: list[float] # Helpers @@ -684,34 +657,55 @@ class _NoShapeTypeSentinel: """ -class Shape: - shapeType: int = NULL - _shapeTypes = frozenset( - [ - NULL, - POINT, - POINTM, - POINTZ, - POLYLINE, - POLYLINEM, - POLYLINEZ, - POLYGON, - POLYGONM, - POLYGONZ, - MULTIPOINT, - MULTIPOINTM, - MULTIPOINTZ, - MULTIPATCH, - ] - ) +def _m_from_point(point: Union[PointMT, PointZT], mpos: int) -> Optional[float]: + if len(point) > mpos and point[mpos] is not None: + return cast(float, point[mpos]) + return None + + +def _ms_from_points( + points: Union[list[PointMT], list[PointZT]], mpos: int +) -> Iterator[Optional[float]]: + return (_m_from_point(p, mpos) for p in points) + + +def _z_from_point(point: PointZT) -> float: + if len(point) >= 3 and point[2] is not None: + return point[2] + return 0.0 + + +def _zs_from_points(points: Iterable[PointZT]) -> Iterator[float]: + return (_z_from_point(p) for p in points) + + +class CanHaveBboxNoLinesKwargs(TypedDict, total=False): + oid: Optional[int] + points: Optional[PointsT] + parts: Optional[Sequence[int]] # index of start point of each part + partTypes: Optional[Sequence[int]] + bbox: Optional[BBox] + m: Optional[Sequence[Optional[float]]] + z: Optional[Sequence[float]] + mbox: Optional[MBox] + zbox: Optional[ZBox] + +class Shape: def __init__( self, shapeType: Union[int, _NoShapeTypeSentinel] = _NoShapeTypeSentinel(), points: Optional[PointsT] = None, - parts: Optional[Sequence[int]] = None, + parts: Optional[Sequence[int]] = None, # index of start point of each part + lines: Optional[list[PointsT]] = None, partTypes: Optional[Sequence[int]] = None, oid: Optional[int] = None, + *, + m: Optional[Sequence[Optional[float]]] = None, + z: Optional[Sequence[float]] = None, + bbox: Optional[BBox] = None, + mbox: Optional[MBox] = None, + zbox: Optional[ZBox] = None, ): """Stores the geometry of the different shape types specified in the Shapefile spec. Shape types are @@ -724,30 +718,150 @@ def __init__( list of shapes. For MultiPatch geometry, partTypes designates the patch type of each of the parts. """ + # Preserve previous behaviour for anyone who set self.shapeType = None if not isinstance(shapeType, _NoShapeTypeSentinel): self.shapeType = shapeType - self.points: PointsT = points or [] - self.parts: Sequence[int] = parts or [] - if partTypes: + else: + class_name = self.__class__.__name__ + self.shapeType = SHAPETYPENUM_LOOKUP.get(class_name.upper(), NULL) + + if partTypes is not None: self.partTypes = partTypes - # and a dict to silently record any errors encountered + default_points: PointsT = [] + default_parts: list[int] = [] + + # Make sure polygon rings (parts) are closed + if lines is not None: + if self.shapeType in Polygon_shapeTypes: + lines = list(lines) + self._ensure_polygon_rings_closed(lines) + + default_points, default_parts = self._points_and_parts_indexes_from_lines( + lines + ) + elif points and self.shapeType in _CanHaveBBox_shapeTypes: + # TODO: Raise issue. + # This ensures Polylines, Polygons and Multipatches with no part information are a single + # Polyline, Polygon or Multipatch respectively. + # + # However this also allows MultiPoints shapes to have a single part index 0 as + # documented in README.md,also when set from points + # (even though this is just an artefact of initialising them as a length-1 nested + # list of points via _points_and_parts_indexes_from_lines). + # + # Alternatively single points could be given parts = [0] too, as they do if formed + # _from_geojson. + default_parts = [0] + + self.points: PointsT = points or default_points + + self.parts: Sequence[int] = parts or default_parts + + # and a dict to silently record any errors encountered in GeoJSON self._errors: dict[str, int] = {} # add oid - if oid is not None: - self.__oid = oid + self.__oid: int = -1 if oid is None else oid + + if bbox is not None: + self.bbox: BBox = bbox + elif len(self.points) >= 2: + self.bbox = self._bbox_from_points() + + ms_found = True + if m: + self.m: Sequence[Optional[float]] = m + elif self.shapeType in _HasM_shapeTypes: + mpos = 3 if self.shapeType in _HasZ_shapeTypes | PointZ_shapeTypes else 2 + points_m_z = cast(Union[list[PointMT], list[PointZT]], self.points) + self.m = list(_ms_from_points(points_m_z, mpos)) + elif self.shapeType in PointM_shapeTypes: + mpos = 3 if self.shapeType == POINTZ else 2 + point_m_z = cast(Union[PointMT, PointZT], self.points[0]) + self.m = (_m_from_point(point_m_z, mpos),) else: - self.__oid = -1 + ms_found = False + + zs_found = True + if z: + self.z: Sequence[float] = z + elif self.shapeType in _HasZ_shapeTypes: + points_z = cast(list[PointZT], self.points) + self.z = list(_zs_from_points(points_z)) + elif self.shapeType == POINTZ: + point_z = cast(PointZT, self.points[0]) + self.z = (_z_from_point(point_z),) + else: + zs_found = False + + if mbox is not None: + self.mbox: MBox = mbox + elif ms_found: + self.mbox = self._mbox_from_ms() + + if zbox is not None: + self.zbox: ZBox = zbox + elif zs_found: + self.zbox = self._zbox_from_zs() - # self.z: Optional[Union[list[Optional[float]], _Array[float]]] = None - # self.m: Optional[list[Optional[float]]] = None - # self.bbox: Optional[_Array[float]] = None + @staticmethod + def _ensure_polygon_rings_closed( + parts: list[PointsT], # Mutated + ) -> None: + for part in parts: + if part[0] != part[-1]: + part.append(part[0]) + + @staticmethod + def _points_and_parts_indexes_from_lines( + parts: list[PointsT], + ) -> tuple[PointsT, list[int]]: + # Intended for Union[Polyline, Polygon, MultiPoint, MultiPatch] + """From a list of parts (each part a list of points) return + a flattened list of points, and a list of indexes into that + flattened list corresponding to the start of each part. + + Internal method for both multipoints (formed entirely by a single part), + and shapes that have multiple collections of points (each one + a part): (poly)lines, polygons, and multipatchs. + """ + part_indexes: list[int] = [] + points: PointsT = [] + + for part in parts: + # set part index position + part_indexes.append(len(points)) + points.extend(part) + + return points, part_indexes + + def _bbox_from_points(self) -> BBox: + xs: list[float] = [] + ys: list[float] = [] + + for point in self.points: + xs.append(point[0]) + ys.append(point[1]) + + return min(xs), min(ys), max(xs), max(ys) + + def _mbox_from_ms(self) -> MBox: + ms: list[float] = [m for m in self.m if m is not None] + + if not ms: + # only if none of the shapes had m values, should mbox be set to missing m values + ms.append(NODATA) + + return min(ms), max(ms) + + def _zbox_from_zs(self) -> ZBox: + return min(self.z), max(self.z) @property def __geo_interface__(self) -> GeoJSONHomogeneousGeometryObject: - if self.shapeType in [POINT, POINTM, POINTZ]: + if self.shapeType in {POINT, POINTM, POINTZ}: # point if len(self.points) == 0: # the shape has no coordinate information, i.e. is 'empty' @@ -757,7 +871,7 @@ def __geo_interface__(self) -> GeoJSONHomogeneousGeometryObject: return {"type": "Point", "coordinates": self.points[0]} - if self.shapeType in [MULTIPOINT, MULTIPOINTM, MULTIPOINTZ]: + if self.shapeType in {MULTIPOINT, MULTIPOINTM, MULTIPOINTZ}: if len(self.points) == 0: # the shape has no coordinate information, i.e. is 'empty' # the geojson spec does not define a proper null-geometry type @@ -770,7 +884,7 @@ def __geo_interface__(self) -> GeoJSONHomogeneousGeometryObject: "coordinates": self.points, } - if self.shapeType in [POLYLINE, POLYLINEM, POLYLINEZ]: + if self.shapeType in {POLYLINE, POLYLINEM, POLYLINEZ}: if len(self.parts) == 0: # the shape has no coordinate information, i.e. is 'empty' # the geojson spec does not define a proper null-geometry type @@ -795,11 +909,12 @@ def __geo_interface__(self) -> GeoJSONHomogeneousGeometryObject: coordinates.append(list(self.points[ps:part])) ps = part - # assert len(self.parts) >1 # so disable pylint rule - coordinates.append(list(self.points[part:])) # pylint: disable=undefined-loop-variable + # assert len(self.parts) > 1 + # from previous if len(self.parts) checks so part is defined + coordinates.append(list(self.points[part:])) return {"type": "MultiLineString", "coordinates": coordinates} - if self.shapeType in [POLYGON, POLYGONM, POLYGONZ]: + if self.shapeType in {POLYGON, POLYGONM, POLYGONZ}: if len(self.parts) == 0: # the shape has no coordinate information, i.e. is 'empty' # the geojson spec does not define a proper null-geometry type @@ -860,22 +975,21 @@ def __geo_interface__(self) -> GeoJSONHomogeneousGeometryObject: @staticmethod def _from_geojson(geoj) -> Shape: # create empty shape - shape = Shape() # set shapeType geojType = geoj["type"] if geoj else "Null" if geojType in GEOJSON_TO_SHAPETYPE: - shape.shapeType = GEOJSON_TO_SHAPETYPE[geojType] + shapeType = GEOJSON_TO_SHAPETYPE[geojType] else: raise GeoJSON_Error(f"Cannot create Shape from GeoJSON type '{geojType}'") # set points and parts if geojType == "Point": - shape.points = [geoj["coordinates"]] - shape.parts = [0] + points = [geoj["coordinates"]] + parts = [0] elif geojType in ("MultiPoint", "LineString"): - shape.points = geoj["coordinates"] - shape.parts = [0] - elif geojType in ("Polygon",): + points = geoj["coordinates"] + parts = [0] + elif geojType == "Polygon": points = [] parts = [] index = 0 @@ -892,9 +1006,7 @@ def _from_geojson(geoj) -> Shape: points.extend(ext_or_hole) parts.append(index) index += len(ext_or_hole) - shape.points = points - shape.parts = parts - elif geojType in ("MultiLineString",): + elif geojType == "MultiLineString": points = [] parts = [] index = 0 @@ -902,9 +1014,7 @@ def _from_geojson(geoj) -> Shape: points.extend(linestring) parts.append(index) index += len(linestring) - shape.points = points - shape.parts = parts - elif geojType in ("MultiPolygon",): + elif geojType == "MultiPolygon": points = [] parts = [] index = 0 @@ -922,9 +1032,7 @@ def _from_geojson(geoj) -> Shape: points.extend(ext_or_hole) parts.append(index) index += len(ext_or_hole) - shape.points = points - shape.parts = parts - return shape + return Shape(shapeType=shapeType, points=points, parts=parts) @property def oid(self) -> int: @@ -936,75 +1044,69 @@ def shapeTypeName(self) -> str: return SHAPETYPE_LOOKUP[self.shapeType] def __repr__(self): - return f"Shape #{self.__oid}: {self.shapeTypeName}" - - -S = TypeVar("S", bound=Shape) + class_name = self.__class__.__name__ + if class_name == "Shape": + return f"Shape #{self.__oid}: {self.shapeTypeName}" + return f"{class_name} #{self.__oid}" -def compatible_with(s: Shape, cls: type[S]) -> TypeIs[S]: - return s.shapeType in cls._shapeTypes - - -# pylint: disable=unused-argument # Need unused arguments to keep the same call signature for # different implementations of from_byte_stream and write_to_byte_stream class NullShape(Shape): # Shape.shapeType = NULL already, # to preserve handling of default args in Shape.__init__ # Repeated for clarity. - shapeType = NULL - _shapeTypes = frozenset([NULL]) + def __init__( + self, + oid: Optional[int] = None, + ): + Shape.__init__(self, shapeType=NULL, oid=oid) - @classmethod + @staticmethod def from_byte_stream( - cls, + shapeType: int, b_io: ReadSeekableBinStream, next_shape: int, oid: Optional[int] = None, bbox: Optional[BBox] = None, - ) -> Self: + ) -> NullShape: # Shape.__init__ sets self.points = points or [] - return cls(oid=oid) + return NullShape(oid=oid) @staticmethod def write_to_byte_stream( b_io: WriteableBinStream, s: Shape, i: int, - bbox: Optional[BBox], - mbox: Optional[MBox], - zbox: Optional[ZBox], ) -> int: return 0 +_CanHaveBBox_shapeTypes = frozenset( + [ + POLYLINE, + POLYLINEM, + POLYLINEZ, + MULTIPOINT, + MULTIPOINTM, + MULTIPOINTZ, + POLYGON, + POLYGONM, + POLYGONZ, + MULTIPATCH, + ] +) + + class _CanHaveBBox(Shape): """As well as setting bounding boxes, we also utilize the fact that this mixin applies to all the shapes that are not a single point. """ - _shapeTypes = frozenset( - [ - POLYLINE, - POLYLINEM, - POLYLINEZ, - POLYGON, - POLYGONM, - POLYGONZ, - MULTIPOINT, - MULTIPOINTM, - MULTIPOINTZ, - MULTIPATCH, - ] - ) - - bbox: Optional[BBox] = None - - def _get_set_bbox_from_byte_stream(self, b_io: ReadableBinStream) -> BBox: - self.bbox: BBox = unpack("<4d", b_io.read(32)) - return self.bbox + @staticmethod + def _read_bbox_from_byte_stream(b_io: ReadableBinStream) -> BBox: + return unpack("<4d", b_io.read(32)) @staticmethod def _write_bbox_to_byte_stream( @@ -1020,16 +1122,19 @@ def _write_bbox_to_byte_stream( ) @staticmethod - def _get_npoints_from_byte_stream(b_io: ReadableBinStream) -> int: + def _read_npoints_from_byte_stream(b_io: ReadableBinStream) -> int: return unpack(" int: return b_io.write(pack(" list[Point2D]: flat = unpack(f"<{2 * nPoints}d", b_io.read(16 * nPoints)) - self.points = list(zip(*(iter(flat),) * 2)) + return list(zip(*(iter(flat),) * 2)) @staticmethod def _write_points_to_byte_stream( @@ -1045,35 +1150,19 @@ def _write_points_to_byte_stream( f"Failed to write points for record {i}. Expected floats." ) - @staticmethod - def _get_nparts_from_byte_stream(b_io: ReadableBinStream) -> int: - return 0 - - def _set_parts_from_byte_stream(self, b_io: ReadableBinStream, nParts: int): - pass - - def _set_part_types_from_byte_stream(self, b_io: ReadableBinStream, nParts: int): - pass - - def _set_zs_from_byte_stream(self, b_io: ReadableBinStream, nPoints: int): - pass - - def _set_ms_from_byte_stream( - self, b_io: ReadSeekableBinStream, nPoints: int, next_shape: int - ): - pass - @classmethod def from_byte_stream( cls, + shapeType: int, b_io: ReadSeekableBinStream, next_shape: int, oid: Optional[int] = None, bbox: Optional[BBox] = None, - ) -> Optional[Self]: - shape = cls(oid=oid) + ) -> Optional[Shape]: + ShapeClass = cast(type[_CanHaveBBox], SHAPE_CLASS_FROM_SHAPETYPE[shapeType]) - shape_bbox = shape._get_set_bbox_from_byte_stream(b_io) + kwargs: CanHaveBboxNoLinesKwargs = {"oid": oid} # "shapeType": shapeType} + kwargs["bbox"] = shape_bbox = cls._read_bbox_from_byte_stream(b_io) # if bbox specified and no overlap, skip this shape if bbox is not None and not bbox_overlap(bbox, shape_bbox): @@ -1081,31 +1170,65 @@ def from_byte_stream( # next shape after we return (as done in f.seek(next_shape)) return None - nParts: Optional[int] = shape._get_nparts_from_byte_stream(b_io) - nPoints: int = shape._get_npoints_from_byte_stream(b_io) + nParts: Optional[int] = ( + _CanHaveParts._read_nparts_from_byte_stream(b_io) + if shapeType in _CanHaveParts_shapeTypes + else None + ) + nPoints: int = cls._read_npoints_from_byte_stream(b_io) # Previously, we also set __zmin = __zmax = __mmin = __mmax = None if nParts: - shape._set_parts_from_byte_stream(b_io, nParts) - shape._set_part_types_from_byte_stream(b_io, nParts) + kwargs["parts"] = _CanHaveParts._read_parts_from_byte_stream(b_io, nParts) + if shapeType == MULTIPATCH: + kwargs["partTypes"] = MultiPatch._read_part_types_from_byte_stream( + b_io, nParts + ) + + # else: + # parts = None + # partTypes = None if nPoints: - shape._set_points_from_byte_stream(b_io, nPoints) + kwargs["points"] = cast( + PointsT, cls._read_points_from_byte_stream(b_io, nPoints) + ) - shape._set_zs_from_byte_stream(b_io, nPoints) + if shapeType in _HasZ_shapeTypes: + kwargs["zbox"], kwargs["z"] = _HasZ._read_zs_from_byte_stream( + b_io, nPoints + ) - shape._set_ms_from_byte_stream(b_io, nPoints, next_shape) + if shapeType in _HasM_shapeTypes: + kwargs["mbox"], kwargs["m"] = _HasM._read_ms_from_byte_stream( + b_io, nPoints, next_shape + ) - return shape + # else: + # points = None + # zbox, zs = None, None + # mbox, ms = None, None + + return ShapeClass(**kwargs) + # return ShapeClass( + # shapeType=shapeType, + # # Mypy 1.17.1 doesn't figure out that an Optional[list[Point2D]] is an Optional[list[PointT]] + # points=cast(Optional[PointsT], points), + # parts=parts, + # partTypes=partTypes, + # oid=oid, + # m=ms, + # z=zs, + # bbox=shape_bbox, + # mbox=mbox, + # zbox=zbox, + # ) @staticmethod def write_to_byte_stream( b_io: WriteableBinStream, s: Shape, i: int, - bbox: Optional[BBox], - mbox: Optional[MBox], - zbox: Optional[ZBox], ) -> int: # We use static methods here and below, # to support s only being an instance of a the @@ -1115,58 +1238,69 @@ def write_to_byte_stream( n = 0 - if compatible_with(s, _CanHaveBBox): - n += _CanHaveBBox._write_bbox_to_byte_stream(b_io, i, bbox) + if s.shapeType in _CanHaveBBox_shapeTypes: + n += _CanHaveBBox._write_bbox_to_byte_stream(b_io, i, s.bbox) - if compatible_with(s, _CanHaveParts): - n += _CanHaveParts._write_nparts_to_byte_stream(b_io, s) + if s.shapeType in _CanHaveParts_shapeTypes: + n += _CanHaveParts._write_nparts_to_byte_stream( + b_io, cast(_CanHaveParts, s) + ) # Shape types with multiple points per record - if compatible_with(s, _CanHaveBBox): - n += _CanHaveBBox._write_npoints_to_byte_stream(b_io, s) + if s.shapeType in _CanHaveBBox_shapeTypes: + n += _CanHaveBBox._write_npoints_to_byte_stream(b_io, cast(_CanHaveBBox, s)) # Write part indexes. Includes MultiPatch - if compatible_with(s, _CanHaveParts): - n += _CanHaveParts._write_part_indices_to_byte_stream(b_io, s) + if s.shapeType in _CanHaveParts_shapeTypes: + n += _CanHaveParts._write_part_indices_to_byte_stream( + b_io, cast(_CanHaveParts, s) + ) - if compatible_with(s, MultiPatch): - n += MultiPatch._write_part_types_to_byte_stream(b_io, s) + if s.shapeType in MultiPatch_shapeTypes: + n += MultiPatch._write_part_types_to_byte_stream(b_io, cast(MultiPatch, s)) # Write points for multiple-point records - if compatible_with(s, _CanHaveBBox): - n += _CanHaveBBox._write_points_to_byte_stream(b_io, s, i) - if compatible_with(s, _HasZ): - n += _HasZ._write_zs_to_byte_stream(b_io, s, i, zbox) + if s.shapeType in _CanHaveBBox_shapeTypes: + n += _CanHaveBBox._write_points_to_byte_stream( + b_io, cast(_CanHaveBBox, s), i + ) + if s.shapeType in _HasZ_shapeTypes: + n += _HasZ._write_zs_to_byte_stream(b_io, cast(_HasZ, s), i, s.zbox) - if compatible_with(s, _HasM): - n += _HasM._write_ms_to_byte_stream(b_io, s, i, mbox) + if s.shapeType in _HasM_shapeTypes: + n += _HasM._write_ms_to_byte_stream(b_io, cast(_HasM, s), i, s.mbox) return n +_CanHaveParts_shapeTypes = frozenset( + [ + POLYLINE, + POLYLINEM, + POLYLINEZ, + POLYGON, + POLYGONM, + POLYGONZ, + MULTIPATCH, + ] +) + + class _CanHaveParts(_CanHaveBBox): # The parts attribute is initialised by # the base class Shape's __init__, to parts or []. # "Can Have Parts" should be read as "Can Have non-empty parts". - _shapeTypes = frozenset( - [ - POLYLINE, - POLYLINEM, - POLYLINEZ, - POLYGON, - POLYGONM, - POLYGONZ, - MULTIPATCH, - ] - ) @staticmethod - def _get_nparts_from_byte_stream(b_io: ReadableBinStream) -> int: + def _read_nparts_from_byte_stream(b_io: ReadableBinStream) -> int: return unpack(" int: return b_io.write(pack(" _Array[int]: + return _Array[int]("i", unpack(f"<{nParts}i", b_io.read(nParts * 4))) @staticmethod def _write_part_indices_to_byte_stream( @@ -1175,20 +1309,20 @@ def _write_part_indices_to_byte_stream( return b_io.write(pack(f"<{len(s.parts)}i", *s.parts)) +Point_shapeTypes = frozenset([POINT, POINTM, POINTZ]) + + class Point(Shape): # We also use the fact that the single Point types are the only # shapes that cannot have their own bounding box (a user supplied # bbox is still used to filter out points). - shapeType = POINT - _shapeTypes = frozenset([POINT, POINTM, POINTZ]) - - def _set_single_point_z_from_byte_stream(self, b_io: ReadableBinStream): - pass - - def _set_single_point_m_from_byte_stream( - self, b_io: ReadSeekableBinStream, next_shape: int + def __init__( + self, + x: float, + y: float, + oid: Optional[int] = None, ): - pass + Shape.__init__(self, points=[(x, y)], oid=oid) @staticmethod def _x_y_from_byte_stream(b_io: ReadableBinStream): @@ -1211,13 +1345,12 @@ def _write_x_y_to_byte_stream( @classmethod def from_byte_stream( cls, + shapeType: int, b_io: ReadSeekableBinStream, next_shape: int, oid: Optional[int] = None, bbox: Optional[BBox] = None, - ) -> Optional[Self]: - shape = cls(oid=oid) - + ) -> Optional[Shape]: x, y = cls._x_y_from_byte_stream(b_io) if bbox is not None: @@ -1225,91 +1358,156 @@ def from_byte_stream( # skip shape if no overlap with bounding box if not bbox_overlap(bbox, (x, y, x, y)): return None + elif shapeType == POINT: + return Point(x=x, y=y, oid=oid) - shape.points = [(x, y)] + if shapeType == POINTZ: + z = PointZ._read_single_point_zs_from_byte_stream(b_io)[0] - shape._set_single_point_z_from_byte_stream(b_io) + m = PointM._read_single_point_ms_from_byte_stream(b_io, next_shape)[0] - shape._set_single_point_m_from_byte_stream(b_io, next_shape) + if shapeType == POINTZ: + return PointZ(x=x, y=y, z=z, m=m, oid=oid) - return shape + return PointM(x=x, y=y, m=m, oid=oid) + # return Shape(shapeType=shapeType, points=[(x, y)], z=zs, m=ms, oid=oid) @staticmethod - def write_to_byte_stream( - b_io: WriteableBinStream, - s: Shape, - i: int, - bbox: Optional[BBox], - mbox: Optional[MBox], - zbox: Optional[ZBox], - ) -> int: + def write_to_byte_stream(b_io: WriteableBinStream, s: Shape, i: int) -> int: # Serialize a single point x, y = s.points[0][0], s.points[0][1] n = Point._write_x_y_to_byte_stream(b_io, x, y, i) # Write a single Z value - if compatible_with(s, PointZ): + if s.shapeType in PointZ_shapeTypes: n += PointZ._write_single_point_z_to_byte_stream(b_io, s, i) # Write a single M value - if compatible_with(s, PointM): + if s.shapeType in PointM_shapeTypes: n += PointM._write_single_point_m_to_byte_stream(b_io, s, i) return n -# pylint: enable=unused-argument +Polyline_shapeTypes = frozenset([POLYLINE, POLYLINEM, POLYLINEZ]) class Polyline(_CanHaveParts): - shapeType = POLYLINE - _shapeTypes = frozenset([POLYLINE, POLYLINEM, POLYLINEZ]) + def __init__( + self, + *args: PointsT, + lines: Optional[list[PointsT]] = None, + points: Optional[PointsT] = None, + parts: Optional[list[int]] = None, + bbox: Optional[BBox] = None, + oid: Optional[int] = None, + ): + if args: + if lines: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg lines. " + f"Not both. Got both: {args} and {lines=}. " + "If this was intentional, after the other positional args, " + "the arg passed to lines can be unpacked (arg1, arg2, *more_args, *lines, oid=oid,...)" + ) + lines = list(args) + Shape.__init__( + self, + lines=lines, + points=points, + parts=parts, + bbox=bbox, + oid=oid, + ) + + +Polygon_shapeTypes = frozenset([POLYGON, POLYGONM, POLYGONZ]) class Polygon(_CanHaveParts): - shapeType = POLYGON - _shapeTypes = frozenset([POLYGON, POLYGONM, POLYGONZ]) + def __init__( + self, + *args: PointsT, + lines: Optional[list[PointsT]] = None, + parts: Optional[list[int]] = None, + points: Optional[PointsT] = None, + bbox: Optional[BBox] = None, + oid: Optional[int] = None, + ): + lines = list(args) if args else lines + Shape.__init__( + self, + lines=lines, + points=points, + parts=parts, + bbox=bbox, + oid=oid, + ) + + +MultiPoint_shapeTypes = frozenset([MULTIPOINT, MULTIPOINTM, MULTIPOINTZ]) class MultiPoint(_CanHaveBBox): - shapeType = MULTIPOINT - _shapeTypes = frozenset([MULTIPOINT, MULTIPOINTM, MULTIPOINTZ]) + def __init__( + self, + *args: PointT, + points: Optional[PointsT] = None, + bbox: Optional[BBox] = None, + oid: Optional[int] = None, + ): + if args: + if points: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg points. " + f"Not both. Got both: {args} and {points=}. " + "If this was intentional, after the other positional args, " + "the arg passed to points can be unpacked, e.g. " + " (arg1, arg2, *more_args, *points, oid=oid,...)" + ) + points = list(args) + Shape.__init__( + self, + points=points, + bbox=bbox, + oid=oid, + ) + + +# Not a PointM or a PointZ +_HasM_shapeTypes = frozenset( + [ + POLYLINEM, + POLYLINEZ, + POLYGONM, + POLYGONZ, + MULTIPOINTM, + MULTIPOINTZ, + MULTIPATCH, + ] +) class _HasM(_CanHaveBBox): - # Not a Point - _shapeTypes = frozenset( - [ - POLYLINEM, - POLYLINEZ, - POLYGONM, - POLYGONZ, - MULTIPOINTM, - MULTIPOINTZ, - MULTIPATCH, - ] - ) m: Sequence[Optional[float]] - def __init__(self, *args, **kwargs): - self.m = [] - super().__init__(*args, **kwargs) - - def _set_ms_from_byte_stream( - self, b_io: ReadSeekableBinStream, nPoints: int, next_shape: int - ): + @staticmethod + def _read_ms_from_byte_stream( + b_io: ReadSeekableBinStream, nPoints: int, next_shape: int + ) -> tuple[MBox, list[Optional[float]]]: if next_shape - b_io.tell() >= 16: - __mmin, __mmax = unpack("<2d", b_io.read(16)) + mbox = unpack("<2d", b_io.read(16)) # Measure values less than -10e38 are nodata values according to the spec if next_shape - b_io.tell() >= nPoints * 8: - self.m = [] + ms = [] for m in unpack(f"<{nPoints}d", b_io.read(nPoints * 8)): if m > NODATA: - self.m.append(m) + ms.append(m) else: - self.m.append(None) + ms.append(None) else: - self.m = [None for _ in range(nPoints)] + ms = [None for _ in range(nPoints)] + return mbox, ms @staticmethod def _write_ms_to_byte_stream( @@ -1327,23 +1525,11 @@ def _write_ms_to_byte_stream( f"Failed to write measure extremes for record {i}. Expected floats" ) try: - if getattr(s, "m", False): - # if m values are stored in attribute - ms = [m if m is not None else NODATA for m in cast(_HasM, s).m] + ms = cast(_HasM, s).m - else: - # if m values are stored as 3rd/4th dimension - # 0-index position of m value is 3 if z type (x,y,z,m), or 2 if m type (x,y,m) - mpos = 3 if s.shapeType in _HasZ._shapeTypes else 2 - ms = [ - cast(float, p[mpos]) - if len(p) > mpos and p[mpos] is not None - else NODATA - for p in s.points - ] - - num_bytes_written += b_io.write(pack(f"<{len(ms)}d", *ms)) + ms_to_encode = [m if m is not None else NODATA for m in ms] + num_bytes_written += b_io.write(pack(f"<{len(ms)}d", *ms_to_encode)) except error: raise ShapefileException( f"Failed to write measure values for record {i}. Expected floats" @@ -1352,25 +1538,26 @@ def _write_ms_to_byte_stream( return num_bytes_written +# Not a PointZ +_HasZ_shapeTypes = frozenset( + [ + POLYLINEZ, + POLYGONZ, + MULTIPOINTZ, + MULTIPATCH, + ] +) + + class _HasZ(_CanHaveBBox): - # Not a Point - _shapeTypes = frozenset( - [ - POLYLINEZ, - POLYGONZ, - MULTIPOINTZ, - MULTIPATCH, - ] - ) z: Sequence[float] - def __init__(self, *args, **kwargs): - self.z = [] - super().__init__(*args, **kwargs) - - def _set_zs_from_byte_stream(self, b_io: ReadableBinStream, nPoints: int): - __zmin, __zmax = unpack("<2d", b_io.read(16)) # pylint: disable=unused-private-member - self.z = _Array[float]("d", unpack(f"<{nPoints}d", b_io.read(nPoints * 8))) + @staticmethod + def _read_zs_from_byte_stream( + b_io: ReadableBinStream, nPoints: int + ) -> tuple[ZBox, Sequence[float]]: + zbox = unpack("<2d", b_io.read(16)) + return zbox, _Array[float]("d", unpack(f"<{nPoints}d", b_io.read(nPoints * 8))) @staticmethod def _write_zs_to_byte_stream( @@ -1388,13 +1575,7 @@ def _write_zs_to_byte_stream( f"Failed to write elevation extremes for record {i}. Expected floats." ) try: - if getattr(s, "z", False): - # if z values are stored in attribute - zs = cast(_HasZ, s).z - else: - # if z values are stored as 3rd dimension - zs = [cast(float, p[2]) if len(p) > 2 else 0 for p in s.points] - + zs = cast(_HasZ, s).z num_bytes_written += b_io.write(pack(f"<{len(zs)}d", *zs)) except error: raise ShapefileException( @@ -1404,105 +1585,230 @@ def _write_zs_to_byte_stream( return num_bytes_written +MultiPatch_shapeTypes = frozenset([MULTIPATCH]) + + class MultiPatch(_HasM, _HasZ, _CanHaveParts): - shapeType = MULTIPATCH - _shapeTypes = frozenset([MULTIPATCH]) + def __init__( + self, + *args: PointsT, + lines: Optional[list[PointsT]] = None, + partTypes: Optional[list[int]] = None, + z: Optional[list[float]] = None, + m: Optional[list[Optional[float]]] = None, + points: Optional[PointsT] = None, + parts: Optional[list[int]] = None, + bbox: Optional[BBox] = None, + mbox: Optional[MBox] = None, + zbox: Optional[ZBox] = None, + oid: Optional[int] = None, + ): + if args: + if lines: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg lines. " + f"Not both. Got both: {args} and {lines=}. " + "If this was intentional, after the other positional args, " + "the arg passed to lines can be unpacked (arg1, arg2, *more_args, *lines, oid=oid,...)" + ) + lines = list(args) + Shape.__init__( + self, + lines=lines, + points=points, + parts=parts, + partTypes=partTypes, + z=z, + m=m, + bbox=bbox, + zbox=zbox, + mbox=mbox, + oid=oid, + ) - def _set_part_types_from_byte_stream(self, b_io: ReadableBinStream, nParts: int): - self.partTypes = _Array[int]("i", unpack(f"<{nParts}i", b_io.read(nParts * 4))) + @staticmethod + def _read_part_types_from_byte_stream( + b_io: ReadableBinStream, nParts: int + ) -> Sequence[int]: + return _Array[int]("i", unpack(f"<{nParts}i", b_io.read(nParts * 4))) @staticmethod def _write_part_types_to_byte_stream(b_io: WriteableBinStream, s: Shape) -> int: return b_io.write(pack(f"<{len(s.partTypes)}i", *s.partTypes)) -class PointM(Point): - shapeType = POINTM - _shapeTypes = frozenset([POINTM, POINTZ]) +PointM_shapeTypes = frozenset([POINTM, POINTZ]) - # same default as in Writer.__shpRecord (if s.shapeType in (11, 21):) - # PyShp encodes None m values as NODATA - m = (None,) - def _set_single_point_m_from_byte_stream( - self, b_io: ReadSeekableBinStream, next_shape: int +class PointM(Point): + def __init__( + self, + x: float, + y: float, + # same default as in Writer.__shpRecord (if s.shapeType in (11, 21):) + # PyShp encodes None m values as NODATA + m: Optional[float] = None, + oid: Optional[int] = None, ): + Shape.__init__(self, points=[(x, y)], m=(m,), oid=oid) + + @staticmethod + def _read_single_point_ms_from_byte_stream( + b_io: ReadSeekableBinStream, next_shape: int + ) -> tuple[Optional[float]]: if next_shape - b_io.tell() >= 8: (m,) = unpack(" NODATA: - self.m = (m,) + return (m,) else: - self.m = (None,) + return (None,) @staticmethod def _write_single_point_m_to_byte_stream( b_io: WriteableBinStream, s: Shape, i: int ) -> int: - # Write a single M value + try: + s = cast(_HasM, s) + m = s.m[0] if s.m else None + except error: + raise ShapefileException( + f"Failed to write measure value for record {i}. Expected floats." + ) + # Note: missing m values are autoset to NODATA. + m_to_encode = m if m is not None else NODATA - if getattr(s, "m", False): - # if m values are stored in attribute - try: - # if not s.m or s.m[0] is None: - # s.m = (NODATA,) - # m = s.m[0] - s = cast(_HasM, s) - m = s.m[0] if s.m and s.m[0] is not None else NODATA - except error: - raise ShapefileException( - f"Failed to write measure value for record {i}. Expected floats." - ) - else: - # if m values are stored as 3rd/4th dimension - # 0-index position of m value is 3 if z type (x,y,z,m), or 2 if m type (x,y,m) - try: - mpos = 3 if s.shapeType == POINTZ else 2 - if len(s.points[0]) < mpos + 1: - # s.points[0].append(NODATA) - m = NODATA - elif s.points[0][mpos] is None: - # s.points[0][mpos] = NODATA - m = NODATA - else: - m = cast(float, s.points[0][mpos]) + return b_io.write(pack("<1d", m_to_encode)) - except error: - raise ShapefileException( - f"Failed to write measure value for record {i}. Expected floats." - ) - return b_io.write(pack("<1d", m)) +PolylineM_shapeTypes = frozenset([POLYLINEM, POLYLINEZ]) class PolylineM(Polyline, _HasM): - shapeType = POLYLINEM - _shapeTypes = frozenset([POLYLINEM, POLYLINEZ]) + def __init__( + self, + *args: PointsT, + lines: Optional[list[PointsT]] = None, + parts: Optional[list[int]] = None, + m: Optional[Sequence[Optional[float]]] = None, + points: Optional[PointsT] = None, + bbox: Optional[BBox] = None, + mbox: Optional[MBox] = None, + oid: Optional[int] = None, + ): + if args: + if lines: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg lines. " + f"Not both. Got both: {args} and {lines=}. " + "If this was intentional, after the other positional args, " + "the arg passed to lines can be unpacked (arg1, arg2, *more_args, *lines, oid=oid,...)" + ) + lines = list(args) + Shape.__init__( + self, + lines=lines, + points=points, + parts=parts, + m=m, + bbox=bbox, + mbox=mbox, + oid=oid, + ) + + +PolygonM_shapeTypes = frozenset([POLYGONM, POLYGONZ]) class PolygonM(Polygon, _HasM): - shapeType = POLYGONM - _shapeTypes = frozenset([POLYGONM, POLYGONZ]) + def __init__( + self, + *args: PointsT, + lines: Optional[list[PointsT]] = None, + parts: Optional[list[int]] = None, + m: Optional[list[Optional[float]]] = None, + points: Optional[PointsT] = None, + bbox: Optional[BBox] = None, + mbox: Optional[MBox] = None, + oid: Optional[int] = None, + ): + if args: + if lines: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg lines. " + f"Not both. Got both: {args} and {lines=}. " + "If this was intentional, after the other positional args, " + "the arg passed to lines can be unpacked (arg1, arg2, *more_args, *lines, oid=oid,...)" + ) + lines = list(args) + Shape.__init__( + self, + lines=lines, + points=points, + parts=parts, + m=m, + bbox=bbox, + mbox=mbox, + oid=oid, + ) + + +MultiPointM_shapeTypes = frozenset([MULTIPOINTM, MULTIPOINTZ]) class MultiPointM(MultiPoint, _HasM): - shapeType = MULTIPOINTM + def __init__( + self, + *args: PointT, + points: Optional[PointsT] = None, + m: Optional[Sequence[Optional[float]]] = None, + bbox: Optional[BBox] = None, + mbox: Optional[MBox] = None, + oid: Optional[int] = None, + ): + if args: + if points: + raise ShapefileException( + "Specify Either: a) positional args, or: b) the keyword arg points. " + f"Not both. Got both: {args} and {points=}. " + "If this was intentional, after the other positional args, " + "the arg passed to points can be unpacked, e.g. " + " (arg1, arg2, *more_args, *points, oid=oid,...)" + ) + points = list(args) + Shape.__init__( + self, + points=points, + m=m, + bbox=bbox, + mbox=mbox, + oid=oid, + ) + - _shapeTypes = frozenset([MULTIPOINTM, MULTIPOINTZ]) +PointZ_shapeTypes = frozenset([POINTZ]) class PointZ(PointM): - shapeType = POINTZ - _shapeTypes = frozenset([POINTZ]) + def __init__( + self, + x: float, + y: float, + z: float = 0.0, + m: Optional[float] = None, + oid: Optional[int] = None, + ): + Shape.__init__(self, points=[(x, y)], z=(z,), m=(m,), oid=oid) # same default as in Writer.__shpRecord (if s.shapeType == 11:) z: Sequence[float] = (0.0,) - def _set_single_point_z_from_byte_stream(self, b_io: ReadableBinStream): - self.z = tuple(unpack(" tuple[float]: + return unpack("= 3 and s.points[0][2] is not None: - z = s.points[0][2] - except error: - raise ShapefileException( - f"Failed to write elevation value for record {i}. Expected floats." - ) + + try: + if s.z: + z = s.z[0] + except error: + raise ShapefileException( + f"Failed to write elevation value for record {i}. Expected floats." + ) return b_io.write(pack(" WriteSeekableBinStream: ... @overload - def __getFileObj(self, f: None) -> Never: ... + def __getFileObj(self, f: None) -> NoReturn: ... @overload def __getFileObj(self, f: WriteSeekableBinStream) -> WriteSeekableBinStream: ... def __getFileObj(self, f): @@ -2964,25 +3359,18 @@ def __shpFileLength(self) -> int: shp.seek(start) return size - def __bbox(self, s: Shape) -> BBox: - xs: list[float] = [] - ys: list[float] = [] - - if not s.points: - # this should not happen. - # any shape that is not null should have at least one point, and only those should be sent here. - # could also mean that earlier code failed to add points to a non-null shape. - raise ShapefileException( - "Cannot create bbox. Expected a valid shape with at least one point. " - f"Got a shape of type {s.shapeType=} and 0 points." - ) + def _update_file_bbox(self, s: Shape): + if s.shapeType == NULL: + shape_bbox = None + elif s.shapeType in _CanHaveBBox_shapeTypes: + shape_bbox = s.bbox + else: + x, y = s.points[0][:2] + shape_bbox = (x, y, x, y) - for point in s.points: - xs.append(point[0]) - ys.append(point[1]) + if shape_bbox is None: + return - shape_bbox = (min(xs), min(ys), max(xs), max(ys)) - # update global if self._bbox: # compare with existing self._bbox = ( @@ -2994,50 +3382,23 @@ def __bbox(self, s: Shape) -> BBox: else: # first time bbox is being set self._bbox = shape_bbox - return shape_bbox - def __zbox(self, s: Union[_HasZ, PointZ]) -> ZBox: - shape_zs: list[float] = [] - if s.z: - shape_zs.extend(s.z) - else: - for p in s.points: - # On a ShapeZ type, M is at index 4, and the point can be a 3-tuple or 4-tuple. - z = p[2] if len(p) >= 3 and p[2] is not None else 0 - shape_zs.append(z) - zbox = (min(shape_zs), max(shape_zs)) - # update global + def _update_file_zbox(self, s: Union[_HasZ, PointZ]): if self._zbox: # compare with existing - self._zbox = (min(zbox[0], self._zbox[0]), max(zbox[1], self._zbox[1])) + self._zbox = (min(s.zbox[0], self._zbox[0]), max(s.zbox[1], self._zbox[1])) else: # first time zbox is being set - self._zbox = zbox - return zbox - - def __mbox(self, s: Union[_HasM, PointM]) -> MBox: - mpos = 3 if s.shapeType in _HasZ._shapeTypes | PointZ._shapeTypes else 2 - shape_ms: list[float] = [] - if s.m: - shape_ms.extend(m for m in s.m if m is not None) - else: - for p in s.points: - m = p[mpos] if len(p) >= mpos + 1 else None - if m is not None: - shape_ms.append(m) + self._zbox = s.zbox - if not shape_ms: - # only if none of the shapes had m values, should mbox be set to missing m values - shape_ms.append(NODATA) - mbox = (min(shape_ms), max(shape_ms)) - # update global + def _update_file_mbox(self, s: Union[_HasM, PointM]): + mbox = s.mbox if self._mbox: # compare with existing self._mbox = (min(mbox[0], self._mbox[0]), max(mbox[1], self._mbox[1])) else: # first time mbox is being set self._mbox = mbox - return mbox @property def shapeTypeName(self) -> str: @@ -3098,7 +3459,7 @@ def __shapefileHeader( else: f.write(pack("<4d", 0, 0, 0, 0)) # Elevation - if self.shapeType in PointZ._shapeTypes | _HasZ._shapeTypes: + if self.shapeType in PointZ_shapeTypes | _HasZ_shapeTypes: # Z values are present in Z type zbox = self.zbox() if zbox is None: @@ -3110,7 +3471,7 @@ def __shapefileHeader( # zbox = ZBox(0, 0) zbox = (0, 0) # Measure - if self.shapeType in PointM._shapeTypes | _HasM._shapeTypes: + if self.shapeType in PointM_shapeTypes | _HasM_shapeTypes: # M values are present in M or Z type mbox = self.mbox() if mbox is None: @@ -3219,17 +3580,14 @@ def __shpRecord(self, s: Shape) -> tuple[int, int]: # For both single point and multiple-points non-null shapes, # update bbox, mbox and zbox of the whole shapefile - shape_bbox = self.__bbox(s) if s.shapeType != NULL else None + if s.shapeType != NULL: + self._update_file_bbox(s) - if s.shapeType in PointM._shapeTypes | _HasM._shapeTypes: - shape_mbox = self.__mbox(cast(Union[_HasM, PointM], s)) - else: - shape_mbox = None + if s.shapeType in PointM_shapeTypes | _HasM_shapeTypes: + self._update_file_mbox(cast(Union[_HasM, PointM], s)) - if s.shapeType in PointZ._shapeTypes | _HasZ._shapeTypes: - shape_zbox = self.__zbox(cast(Union[_HasZ, PointZ], s)) - else: - shape_zbox = None + if s.shapeType in PointZ_shapeTypes | _HasZ_shapeTypes: + self._update_file_zbox(cast(Union[_HasZ, PointZ], s)) # Create an in-memory binary buffer to avoid # unnecessary seeks to files on disk @@ -3252,9 +3610,6 @@ def __shpRecord(self, s: Shape) -> tuple[int, int]: b_io=b_io, s=s, i=self.shpNum, - bbox=shape_bbox, - mbox=shape_mbox, - zbox=shape_zbox, ) # Finalize record length as 16-bit words @@ -3320,7 +3675,7 @@ def record( record = ["" for _ in range(fieldCount)] self.__dbfRecord(record) - def __dbfRecord(self, record): + def __dbfRecord(self, record: list[RecordValue]) -> None: """Writes the dbf records.""" f = self.__getFileObj(self.dbf) if self.recNum == 0: @@ -3351,15 +3706,15 @@ def __dbfRecord(self, record): # first try to force directly to int. # forcing a large int to float and back to int # will lose information and result in wrong nr. - num_val = int(value) + num_val = int(cast(int, value)) except ValueError: # forcing directly to int failed, so was probably a float. - num_val = int(float(value)) + num_val = int(float(cast(float, value))) str_val = format(num_val, "d")[:size].rjust( size ) # caps the size if exceeds the field size else: - f_val = float(value) + f_val = float(cast(float, value)) str_val = format(f_val, f".{deci}f")[:size].rjust( size ) # caps the size if exceeds the field size @@ -3428,15 +3783,13 @@ def null(self) -> None: def point(self, x: float, y: float) -> None: """Creates a POINT shape.""" - pointShape = Point() - pointShape.points.append((x, y)) + pointShape = Point(x, y) self.shape(pointShape) def pointm(self, x: float, y: float, m: Optional[float] = None) -> None: """Creates a POINTM shape. If the m (measure) value is not set, it defaults to NoData.""" - pointShape = PointM() - pointShape.points.append((x, y, m)) + pointShape = PointM(x, y, m) self.shape(pointShape) def pointz( @@ -3445,22 +3798,23 @@ def pointz( """Creates a POINTZ shape. If the z (elevation) value is not set, it defaults to 0. If the m (measure) value is not set, it defaults to NoData.""" - pointShape = PointZ() - pointShape.points.append((x, y, z, m)) + pointShape = PointZ(x, y, z, m) self.shape(pointShape) def multipoint(self, points: PointsT) -> None: """Creates a MULTIPOINT shape. Points is a list of xy values.""" # nest the points inside a list to be compatible with the generic shapeparts method - self._shapeparts(parts=[points], polyShape=MultiPoint()) + shape = MultiPoint(points=points) + self.shape(shape) def multipointm(self, points: PointsT) -> None: """Creates a MULTIPOINTM shape. Points is a list of xym values. If the m (measure) value is not included, it defaults to None (NoData).""" # nest the points inside a list to be compatible with the generic shapeparts method - self._shapeparts(parts=[points], polyShape=MultiPointM()) + shape = MultiPointM(points=points) + self.shape(shape) def multipointz(self, points: PointsT) -> None: """Creates a MULTIPOINTZ shape. @@ -3468,32 +3822,37 @@ def multipointz(self, points: PointsT) -> None: If the z (elevation) value is not included, it defaults to 0. If the m (measure) value is not included, it defaults to None (NoData).""" # nest the points inside a list to be compatible with the generic shapeparts method - self._shapeparts(parts=[points], polyShape=MultiPointZ()) + shape = MultiPointZ(points=points) + self.shape(shape) def line(self, lines: list[PointsT]) -> None: """Creates a POLYLINE shape. Lines is a collection of lines, each made up of a list of xy values.""" - self._shapeparts(parts=lines, polyShape=Polyline()) + shape = Polyline(lines=lines) + self.shape(shape) def linem(self, lines: list[PointsT]) -> None: """Creates a POLYLINEM shape. Lines is a collection of lines, each made up of a list of xym values. If the m (measure) value is not included, it defaults to None (NoData).""" - self._shapeparts(parts=lines, polyShape=PolylineM()) + shape = PolylineM(lines=lines) + self.shape(shape) def linez(self, lines: list[PointsT]) -> None: """Creates a POLYLINEZ shape. Lines is a collection of lines, each made up of a list of xyzm values. If the z (elevation) value is not included, it defaults to 0. If the m (measure) value is not included, it defaults to None (NoData).""" - self._shapeparts(parts=lines, polyShape=PolylineZ()) + shape = PolylineZ(lines=lines) + self.shape(shape) def poly(self, polys: list[PointsT]) -> None: """Creates a POLYGON shape. Polys is a collection of polygons, each made up of a list of xy values. Note that for ordinary polygons the coordinates must run in a clockwise direction. If some of the polygons are holes, these must run in a counterclockwise direction.""" - self._shapeparts(parts=polys, polyShape=Polygon()) + shape = Polygon(lines=polys) + self.shape(shape) def polym(self, polys: list[PointsT]) -> None: """Creates a POLYGONM shape. @@ -3501,7 +3860,8 @@ def polym(self, polys: list[PointsT]) -> None: Note that for ordinary polygons the coordinates must run in a clockwise direction. If some of the polygons are holes, these must run in a counterclockwise direction. If the m (measure) value is not included, it defaults to None (NoData).""" - self._shapeparts(parts=polys, polyShape=PolygonM()) + shape = PolygonM(lines=polys) + self.shape(shape) def polyz(self, polys: list[PointsT]) -> None: """Creates a POLYGONZ shape. @@ -3510,7 +3870,8 @@ def polyz(self, polys: list[PointsT]) -> None: If some of the polygons are holes, these must run in a counterclockwise direction. If the z (elevation) value is not included, it defaults to 0. If the m (measure) value is not included, it defaults to None (NoData).""" - self._shapeparts(parts=polys, polyShape=PolygonZ()) + shape = PolygonZ(lines=polys) + self.shape(shape) def multipatch(self, parts: list[PointsT], partTypes: list[int]) -> None: """Creates a MULTIPATCH shape. @@ -3520,52 +3881,8 @@ def multipatch(self, parts: list[PointsT], partTypes: list[int]) -> None: TRIANGLE_FAN, OUTER_RING, INNER_RING, FIRST_RING, or RING. If the z (elevation) value is not included, it defaults to 0. If the m (measure) value is not included, it defaults to None (NoData).""" - polyShape = MultiPatch() - polyShape.parts = [] - polyShape.points = [] - for part in parts: - # set part index position - polyShape.parts.append(len(polyShape.points)) - # add points - # for point in part: - # # Ensure point is list - # if not isinstance(point, list): - # point = list(point) - # polyShape.points.append(point) - polyShape.points.extend(part) - polyShape.partTypes = partTypes - # write the shape - self.shape(polyShape) - - def _shapeparts( - self, parts: list[PointsT], polyShape: Union[Polyline, Polygon, MultiPoint] - ) -> None: - """Internal method for adding a shape that has multiple collections of points (parts): - lines, polygons, and multipoint shapes. - """ - polyShape.parts = [] - polyShape.points = [] - # Make sure polygon rings (parts) are closed - - # if shapeType in (5, 15, 25, 31): - # This method is never actually called on a MultiPatch - # so we omit its shapeType (31) for efficiency - if compatible_with(polyShape, Polygon): - for part in parts: - if part[0] != part[-1]: - part.append(part[0]) - # Add points and part indexes - for part in parts: - # set part index position - polyShape.parts.append(len(polyShape.points)) - # add points - # for point in part: - # # Ensure point is list - # point_list = list(point) - # polyShape.points.append(point_list) - polyShape.points.extend(part) - # write the shape - self.shape(polyShape) + shape = MultiPatch(lines=parts, partTypes=partTypes) + self.shape(shape) def field( # Types of args should match *Field @@ -3611,7 +3928,7 @@ def _filter_network_doctests( examples_it = iter(examples) - yield next(examples_it) # pylint: disable=stop-iteration-return + yield next(examples_it) for example in examples_it: # Track variables in doctest shell sessions defined from commands