diff --git a/Cargo.lock b/Cargo.lock index ede8317..3788d6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,7 +659,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.9.2", "regex", "sqlparser", "tempfile", @@ -774,7 +774,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "rand", + "rand 0.9.2", "tokio", "url", ] @@ -900,7 +900,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.9.2", "tempfile", "url", ] @@ -961,7 +961,7 @@ dependencies = [ "itertools 0.14.0", "log", "num-traits", - "rand", + "rand 0.9.2", "regex", "unicode-segmentation", "uuid", @@ -1640,7 +1640,7 @@ dependencies = [ [[package]] name = "geodatafusion" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-arith", @@ -1653,7 +1653,10 @@ dependencies = [ "geoarrow-expr-geo", "geoarrow-schema", "geohash", + "geojson", "object_store", + "serde_json", + "spatialbench", "thiserror 1.0.69", "tokio", "wkt 0.14.0", @@ -1661,7 +1664,7 @@ dependencies = [ [[package]] name = "geodatafusion-flatgeobuf" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-array", @@ -1685,7 +1688,7 @@ dependencies = [ [[package]] name = "geodatafusion-geojson" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "arrow-array", "arrow-schema", @@ -1707,7 +1710,7 @@ dependencies = [ [[package]] name = "geodatafusion-geoparquet" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-schema", @@ -2548,7 +2551,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml", - "rand", + "rand 0.9.2", "reqwest", "ring", "serde", @@ -2785,7 +2788,7 @@ dependencies = [ "bytes", "getrandom 0.3.3", "lru-slab", - "rand", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -2826,14 +2829,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -2843,7 +2867,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -3335,6 +3368,18 @@ dependencies = [ "smallvec", ] +[[package]] +name = "spatialbench" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07f3f4b67ccf571f183d3695aa6b9d6f996864c31782a480e97a23ed0f2f6f18" +dependencies = [ + "geo", + "once_cell", + "rand 0.8.5", + "serde", +] + [[package]] name = "sqlparser" version = "0.59.0" diff --git a/rust/geodatafusion/Cargo.toml b/rust/geodatafusion/Cargo.toml index 5fb3ddc..2e59cd1 100644 --- a/rust/geodatafusion/Cargo.toml +++ b/rust/geodatafusion/Cargo.toml @@ -21,12 +21,15 @@ geoarrow-array = { workspace = true } geoarrow-expr-geo = { workspace = true } geoarrow-schema = { workspace = true } geohash = { workspace = true } +geojson = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } wkt = { workspace = true } [dev-dependencies] approx = { workspace = true } datafusion = { workspace = true, features = ["sql"] } +spatialbench = { workspace = true } geo-traits = { workspace = true } geoarrow-array = { workspace = true, features = ["test-data"] } object_store = { workspace = true, features = ["http"] } diff --git a/rust/geodatafusion/src/udf/geo/measurement/advanced.rs b/rust/geodatafusion/src/udf/geo/measurement/advanced.rs new file mode 100644 index 0000000..f08f0f9 --- /dev/null +++ b/rust/geodatafusion/src/udf/geo/measurement/advanced.rs @@ -0,0 +1,1067 @@ +use std::any::Any; +use std::sync::{Arc, LazyLock, OnceLock}; + +use arrow_array::Float64Array; +use arrow_array::builder::Float64Builder; +use arrow_schema::{DataType, FieldRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::{CoordType, Dimension, Metadata, PointType}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_Perimeter +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct Perimeter; + +impl Perimeter { + pub fn new() -> Self { + Self {} + } +} + +impl Default for Perimeter { + fn default() -> Self { + Self::new() + } +} + +static PERIMETER_ALIASES: LazyLock> = + LazyLock::new(|| vec!["st_perimeter2d".to_string()]); +static PERIMETER_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for Perimeter { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_perimeter" + } + + fn aliases(&self) -> &[String] { + PERIMETER_ALIASES.as_slice() + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(perimeter_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(PERIMETER_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the 2D perimeter of a polygon or multipolygon geometry. For non-areal geometries returns 0.", + "ST_Perimeter(geom)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn perimeter_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = perimeter_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result))) +} + +fn perimeter_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _perimeter_typed) +} + +fn _perimeter_typed<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geo::EuclideanLength; + + let mut builder = Float64Builder::with_capacity(array.len()); + for geom_opt in array.iter() { + match geom_opt { + Some(geom) => { + let geo_geom = geometry_to_geo(&geom?)?; + let perimeter = match &geo_geom { + geo::Geometry::Polygon(p) => { + let exterior = p.exterior().euclidean_length(); + let interiors: f64 = + p.interiors().iter().map(|r| r.euclidean_length()).sum(); + exterior + interiors + } + geo::Geometry::MultiPolygon(mp) => { + mp.0.iter() + .map(|p| { + let exterior = p.exterior().euclidean_length(); + let interiors: f64 = + p.interiors().iter().map(|r| r.euclidean_length()).sum(); + exterior + interiors + }) + .sum() + } + _ => 0.0, + }; + builder.append_value(perimeter); + } + None => builder.append_null(), + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_Azimuth +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct Azimuth; + +impl Azimuth { + pub fn new() -> Self { + Self {} + } +} + +impl Default for Azimuth { + fn default() -> Self { + Self::new() + } +} + +static AZIMUTH_SIGNATURE: LazyLock = + LazyLock::new(|| Signature::any(2, Volatility::Immutable)); +static AZIMUTH_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for Azimuth { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_azimuth" + } + + fn signature(&self) -> &Signature { + &AZIMUTH_SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(azimuth_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AZIMUTH_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the azimuth in radians of the segment defined by the two given point geometries. The azimuth is measured clockwise from north (0 radians).", + "ST_Azimuth(pointA, pointB)", + ) + .with_argument("pointA", "geometry (point)") + .with_argument("pointB", "geometry (point)") + .build() + })) + } +} + +fn azimuth_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_arr = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_arr = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = azimuth_arrays(&left_arr, &right_arr)?; + Ok(ColumnarValue::Array(Arc::new(result))) +} + +fn azimuth_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _azimuth_left, right) +} + +fn _azimuth_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _azimuth_both, left) +} + +fn _azimuth_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = Float64Builder::with_capacity(left.len()); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + match (&left_geo, &right_geo) { + (geo::Geometry::Point(pa), geo::Geometry::Point(pb)) => { + let dx = pb.x() - pa.x(); + let dy = pb.y() - pa.y(); + // atan2(dx, dy) gives angle from north (positive Y), + // clockwise positive + let mut azimuth = dx.atan2(dy); + if azimuth < 0.0 { + azimuth += 2.0 * std::f64::consts::PI; + } + builder.append_value(azimuth); + } + _ => builder.append_null(), + } + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_HausdorffDistance +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct HausdorffDistance; + +impl HausdorffDistance { + pub fn new() -> Self { + Self {} + } +} + +impl Default for HausdorffDistance { + fn default() -> Self { + Self::new() + } +} + +static HAUSDORFF_SIGNATURE: LazyLock = + LazyLock::new(|| Signature::any(2, Volatility::Immutable)); +static HAUSDORFF_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for HausdorffDistance { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_hausdorffdistance" + } + + fn signature(&self) -> &Signature { + &HAUSDORFF_SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(hausdorff_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(HAUSDORFF_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the Hausdorff distance between two geometries. The Hausdorff distance is the greatest distance between any point in one geometry and the closest point in the other geometry.", + "ST_HausdorffDistance(geomA, geomB)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .build() + })) + } +} + +fn hausdorff_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_arr = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_arr = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = hausdorff_arrays(&left_arr, &right_arr)?; + Ok(ColumnarValue::Array(Arc::new(result))) +} + +fn hausdorff_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _hausdorff_left, right) +} + +fn _hausdorff_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _hausdorff_both, left) +} + +fn _hausdorff_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geo::HausdorffDistance as GeoHausdorff; + + let mut builder = Float64Builder::with_capacity(left.len()); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + let dist = left_geo.hausdorff_distance(&right_geo); + builder.append_value(dist); + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_FrechetDistance +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct FrechetDistance; + +impl FrechetDistance { + pub fn new() -> Self { + Self {} + } +} + +impl Default for FrechetDistance { + fn default() -> Self { + Self::new() + } +} + +static FRECHET_SIGNATURE: LazyLock = + LazyLock::new(|| Signature::any(2, Volatility::Immutable)); +static FRECHET_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for FrechetDistance { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_frechetdistance" + } + + fn signature(&self) -> &Signature { + &FRECHET_SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(frechet_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(FRECHET_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the discrete Frechet distance between two geometries. The Frechet distance is a measure of similarity between curves that accounts for the ordering and location of the points along the curves.", + "ST_FrechetDistance(geomA, geomB)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .build() + })) + } +} + +fn frechet_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_arr = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_arr = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = frechet_arrays(&left_arr, &right_arr)?; + Ok(ColumnarValue::Array(Arc::new(result))) +} + +fn frechet_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _frechet_left, right) +} + +fn _frechet_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _frechet_both, left) +} + +fn _frechet_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geo::FrechetDistance as GeoFrechet; + + let mut builder = Float64Builder::with_capacity(left.len()); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + match (&left_geo, &right_geo) { + (geo::Geometry::LineString(ls_a), geo::Geometry::LineString(ls_b)) => { + let dist = ls_a.frechet_distance(ls_b); + builder.append_value(dist); + } + _ => builder.append_null(), + } + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_ClosestPoint +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct ClosestPoint { + coord_type: CoordType, +} + +impl ClosestPoint { + pub fn new(coord_type: CoordType) -> Self { + Self { coord_type } + } +} + +impl Default for ClosestPoint { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static CLOSEST_POINT_SIGNATURE: LazyLock = + LazyLock::new(|| Signature::any(2, Volatility::Immutable)); +static CLOSEST_POINT_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for ClosestPoint { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_closestpoint" + } + + fn signature(&self) -> &Signature { + &CLOSEST_POINT_SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + Ok(closest_point_return_field(args, self.coord_type)?) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(closest_point_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(CLOSEST_POINT_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the 2D point on geomA that is closest to geomB. This is the point from which the shortest distance between the two geometries is measured.", + "ST_ClosestPoint(geomA, geomB)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .build() + })) + } +} + +fn closest_point_return_field( + args: ReturnFieldArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let metadata = Arc::new(Metadata::try_from(args.arg_fields[0].as_ref()).unwrap_or_default()); + let output_type = PointType::new(Dimension::XY, metadata).with_coord_type(coord_type); + Ok(Arc::new(output_type.to_field("", true))) +} + +fn closest_point_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_arr = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_arr = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = closest_point_arrays(&left_arr, &right_arr, coord_type)?; + Ok(ColumnarValue::Array(result.into_array_ref())) +} + +fn closest_point_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _closest_point_left, right, coord_type) +} + +fn _closest_point_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _closest_point_both, left, coord_type) +} + +fn _closest_point_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, + coord_type: CoordType, +) -> GeoArrowResult { + + + + let point_type = PointType::new(Dimension::XY, Default::default()).with_coord_type(coord_type); + let mut builder = geoarrow_array::builder::PointBuilder::new(point_type); + + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + // Find the closest point on left_geo to right_geo + // We need a point from right_geo to query against left_geo + // ClosestPoint trait: left_geo.closest_point(&point) where point is a Point + // For a general geometry, we use the centroid of right_geo as the query point, + // but the PostGIS semantics are: point on A closest to B. + // geo's ClosestPoint takes a Point argument. We need to iterate + // or use a different approach. We'll compute closest point on A to each + // vertex of B and pick the minimum distance one. + let closest = find_closest_point_on_geom(&left_geo, &right_geo); + match closest { + Some(pt) => builder.push_point(Some(&pt)), + None => builder.push_null(), + } + } + _ => builder.push_null(), + } + } + Ok(builder.finish()) +} + +/// Find the point on `geom_a` that is closest to `geom_b`. +/// +/// Iterates over all vertices of `geom_b`, queries `geom_a.closest_point(&vertex)` +/// for each, and returns the candidate with the minimum distance to the query vertex. +/// This is vertex-exact: the result is guaranteed to be the closest point on A to +/// one of B's vertices (not necessarily to an edge midpoint of B). +fn find_closest_point_on_geom( + geom_a: &geo::Geometry, + geom_b: &geo::Geometry, +) -> Option { + use geo::{Closest, ClosestPoint as GeoClosestPoint, CoordsIter, EuclideanDistance}; + + let mut best: Option = None; + let mut best_dist = f64::INFINITY; + + for coord in geom_b.coords_iter() { + let query = geo::Point::new(coord.x, coord.y); + let candidate = match geom_a.closest_point(&query) { + Closest::SinglePoint(p) | Closest::Intersection(p) => p, + Closest::Indeterminate => continue, + }; + let dist = candidate.euclidean_distance(&query); + if dist < best_dist { + best_dist = dist; + best = Some(candidate); + } + } + best +} + +// --------------------------------------------------------------------------- +// ST_MaxDistance +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MaxDistance; + +impl MaxDistance { + pub fn new() -> Self { + Self {} + } +} + +impl Default for MaxDistance { + fn default() -> Self { + Self::new() + } +} + +static MAX_DISTANCE_SIGNATURE: LazyLock = + LazyLock::new(|| Signature::any(2, Volatility::Immutable)); +static MAX_DISTANCE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MaxDistance { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_maxdistance" + } + + fn signature(&self) -> &Signature { + &MAX_DISTANCE_SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(max_distance_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAX_DISTANCE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the maximum 2D Cartesian distance between two geometries, computed over all pairs of vertices.", + "ST_MaxDistance(geomA, geomB)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .build() + })) + } +} + +fn max_distance_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_arr = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_arr = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = max_distance_arrays(&left_arr, &right_arr)?; + Ok(ColumnarValue::Array(Arc::new(result))) +} + +fn max_distance_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _max_distance_left, right) +} + +fn _max_distance_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _max_distance_both, left) +} + +fn _max_distance_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geo::CoordsIter; + use geo::EuclideanDistance; + + let mut builder = Float64Builder::with_capacity(left.len()); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + let left_coords: Vec<_> = left_geo.coords_iter().collect(); + let right_coords: Vec<_> = right_geo.coords_iter().collect(); + let mut max_dist: f64 = 0.0; + for lc in &left_coords { + let lp = geo::Point::new(lc.x, lc.y); + for rc in &right_coords { + let rp = geo::Point::new(rc.x, rc.y); + let d = lp.euclidean_distance(&rp); + if d > max_dist { + max_dist = d; + } + } + } + builder.append_value(max_dist); + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use approx::assert_relative_eq; + use arrow_array::cast::AsArray; + use arrow_array::types::Float64Type; + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + // ----------------------------------------------------------------------- + // ST_Perimeter tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_perimeter_square() { + let ctx = SessionContext::new(); + ctx.register_udf(Perimeter::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // 1x1 square: perimeter = 4 + let df = ctx + .sql("SELECT ST_Perimeter(ST_GeomFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 4.0); + } + + #[tokio::test] + async fn test_perimeter_polygon_with_hole() { + let ctx = SessionContext::new(); + ctx.register_udf(Perimeter::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Outer square 10x10 (perimeter 40), inner square 2x2 (perimeter 8) => total 48 + let df = ctx + .sql("SELECT ST_Perimeter(ST_GeomFromText('POLYGON((0 0, 10 0, 10 10, 0 10, 0 0), (4 4, 6 4, 6 6, 4 6, 4 4))'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 48.0); + } + + #[tokio::test] + async fn test_perimeter_point() { + let ctx = SessionContext::new(); + ctx.register_udf(Perimeter::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point has perimeter 0 + let df = ctx + .sql("SELECT ST_Perimeter(ST_GeomFromText('POINT(1 2)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 0.0); + } + + #[tokio::test] + async fn test_perimeter_linestring() { + let ctx = SessionContext::new(); + ctx.register_udf(Perimeter::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // LineString has perimeter 0 (not an areal geometry) + let df = ctx + .sql("SELECT ST_Perimeter(ST_GeomFromText('LINESTRING(0 0, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 0.0); + } + + // ----------------------------------------------------------------------- + // ST_Azimuth tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_azimuth_north() { + let ctx = SessionContext::new(); + ctx.register_udf(Azimuth::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point directly north => azimuth = 0 + let df = ctx + .sql("SELECT ST_Azimuth(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(0 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 0.0, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_azimuth_east() { + let ctx = SessionContext::new(); + ctx.register_udf(Azimuth::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point directly east => azimuth = pi/2 + let df = ctx + .sql("SELECT ST_Azimuth(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(1 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, std::f64::consts::FRAC_PI_2, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_azimuth_south() { + let ctx = SessionContext::new(); + ctx.register_udf(Azimuth::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point directly south => azimuth = pi + let df = ctx + .sql( + "SELECT ST_Azimuth(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(0 -1)'));", + ) + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, std::f64::consts::PI, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_azimuth_west() { + let ctx = SessionContext::new(); + ctx.register_udf(Azimuth::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point directly west => azimuth = 3*pi/2 + let df = ctx + .sql("SELECT ST_Azimuth(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(-1 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 3.0 * std::f64::consts::FRAC_PI_2, epsilon = 1e-10); + } + + // ----------------------------------------------------------------------- + // ST_HausdorffDistance tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_hausdorff_distance_identical() { + let ctx = SessionContext::new(); + ctx.register_udf(HausdorffDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Identical geometries => Hausdorff distance = 0 + let df = ctx + .sql("SELECT ST_HausdorffDistance(ST_GeomFromText('LINESTRING(0 0, 1 1)'), ST_GeomFromText('LINESTRING(0 0, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 0.0); + } + + #[tokio::test] + async fn test_hausdorff_distance_simple() { + let ctx = SessionContext::new(); + ctx.register_udf(HausdorffDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // LINESTRING(0 0, 1 0) vs LINESTRING(0 1, 1 1) + // Every point on A is distance 1 from B, every point on B is distance 1 from A + // Hausdorff distance = 1 + let df = ctx + .sql("SELECT ST_HausdorffDistance(ST_GeomFromText('LINESTRING(0 0, 1 0)'), ST_GeomFromText('LINESTRING(0 1, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 1.0, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_hausdorff_distance_points() { + let ctx = SessionContext::new(); + ctx.register_udf(HausdorffDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // POINT(0 0) vs POINT(3 4) => distance = 5 + let df = ctx + .sql("SELECT ST_HausdorffDistance(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(3 4)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 5.0, epsilon = 1e-10); + } + + // ----------------------------------------------------------------------- + // ST_FrechetDistance tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_frechet_distance_identical() { + let ctx = SessionContext::new(); + ctx.register_udf(FrechetDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Identical linestrings => Frechet distance = 0 + let df = ctx + .sql("SELECT ST_FrechetDistance(ST_GeomFromText('LINESTRING(0 0, 1 1)'), ST_GeomFromText('LINESTRING(0 0, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_eq!(val, 0.0); + } + + #[tokio::test] + async fn test_frechet_distance_simple() { + let ctx = SessionContext::new(); + ctx.register_udf(FrechetDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // LINESTRING(0 0, 1 0) vs LINESTRING(0 1, 1 1) => Frechet distance = 1 + let df = ctx + .sql("SELECT ST_FrechetDistance(ST_GeomFromText('LINESTRING(0 0, 1 0)'), ST_GeomFromText('LINESTRING(0 1, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 1.0, epsilon = 1e-10); + } + + // ----------------------------------------------------------------------- + // ST_ClosestPoint tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_closest_point_on_line() { + use geo_traits::{CoordTrait, PointTrait}; + use geoarrow_array::array::PointArray; + + let ctx = SessionContext::new(); + ctx.register_udf(ClosestPoint::default().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Closest point on LINESTRING(0 0, 10 0) to POINT(5 5) should be POINT(5 0) + let df = ctx + .sql("SELECT ST_ClosestPoint(ST_GeomFromText('LINESTRING(0 0, 10 0)'), ST_GeomFromText('POINT(5 5)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let geo_arr = + PointArray::try_from((batch.column(0).as_ref(), batch.schema().field(0))).unwrap(); + let point = geo_arr.value(0).unwrap(); + assert_relative_eq!(point.coord().unwrap().x(), 5.0, epsilon = 1e-10); + assert_relative_eq!(point.coord().unwrap().y(), 0.0, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_closest_point_coincident() { + use geo_traits::{CoordTrait, PointTrait}; + use geoarrow_array::array::PointArray; + + let ctx = SessionContext::new(); + ctx.register_udf(ClosestPoint::default().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Point on the line itself + let df = ctx + .sql("SELECT ST_ClosestPoint(ST_GeomFromText('LINESTRING(0 0, 10 0)'), ST_GeomFromText('POINT(3 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let geo_arr = + PointArray::try_from((batch.column(0).as_ref(), batch.schema().field(0))).unwrap(); + let point = geo_arr.value(0).unwrap(); + assert_relative_eq!(point.coord().unwrap().x(), 3.0, epsilon = 1e-10); + assert_relative_eq!(point.coord().unwrap().y(), 0.0, epsilon = 1e-10); + } + + // ----------------------------------------------------------------------- + // ST_MaxDistance tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_max_distance_points() { + let ctx = SessionContext::new(); + ctx.register_udf(MaxDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // POINT(0 0) vs POINT(3 4) => max distance = 5 + let df = ctx + .sql("SELECT ST_MaxDistance(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(3 4)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 5.0, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_max_distance_line_to_point() { + let ctx = SessionContext::new(); + ctx.register_udf(MaxDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // LINESTRING(0 0, 3 0) vs POINT(0 4) + // Max distance is from (3,0) to (0,4) = 5 + let df = ctx + .sql("SELECT ST_MaxDistance(ST_GeomFromText('LINESTRING(0 0, 3 0)'), ST_GeomFromText('POINT(0 4)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, 5.0, epsilon = 1e-10); + } + + #[tokio::test] + async fn test_max_distance_same_geometry() { + let ctx = SessionContext::new(); + ctx.register_udf(MaxDistance::new().into()); + ctx.register_udf(GeomFromText::new(Default::default()).into()); + + // Square: max distance is the diagonal = sqrt(2) + let df = ctx + .sql("SELECT ST_MaxDistance(ST_GeomFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'), ST_GeomFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_primitive::().value(0); + assert_relative_eq!(val, std::f64::consts::SQRT_2, epsilon = 1e-10); + } +} diff --git a/rust/geodatafusion/src/udf/geo/measurement/mod.rs b/rust/geodatafusion/src/udf/geo/measurement/mod.rs index 0600372..5f66a64 100644 --- a/rust/geodatafusion/src/udf/geo/measurement/mod.rs +++ b/rust/geodatafusion/src/udf/geo/measurement/mod.rs @@ -1,13 +1,21 @@ +mod advanced; mod area; mod distance; mod length; +pub use advanced::{Azimuth, ClosestPoint, FrechetDistance, HausdorffDistance, MaxDistance, Perimeter}; pub use area::Area; pub use distance::Distance; pub use length::Length; pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(Area.into()); + session_context.register_udf(Azimuth.into()); + session_context.register_udf(ClosestPoint::default().into()); session_context.register_udf(Distance.into()); + session_context.register_udf(FrechetDistance.into()); + session_context.register_udf(HausdorffDistance.into()); session_context.register_udf(Length.into()); + session_context.register_udf(MaxDistance.into()); + session_context.register_udf(Perimeter.into()); } diff --git a/rust/geodatafusion/src/udf/geo/relationships/dwithin.rs b/rust/geodatafusion/src/udf/geo/relationships/dwithin.rs new file mode 100644 index 0000000..a2bdae2 --- /dev/null +++ b/rust/geodatafusion/src/udf/geo/relationships/dwithin.rs @@ -0,0 +1,303 @@ +use std::any::Any; +use std::sync::OnceLock; + +use arrow_array::BooleanArray; +use arrow_array::builder::BooleanBuilder; +use arrow_schema::{DataType, Field}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use geo::EuclideanDistance; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; + +use crate::error::GeoDataFusionResult; + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct DWithin { + signature: Signature, +} + +impl DWithin { + pub fn new() -> Self { + Self { + signature: Signature::any(3, Volatility::Immutable), + } + } +} + +impl Default for DWithin { + fn default() -> Self { + Self::new() + } +} + +static DOCUMENTATION: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for DWithin { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_dwithin" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let mut arg_iter = args.args.into_iter(); + let left = arg_iter.next().unwrap(); + let right = arg_iter.next().unwrap(); + let distance_arg = arg_iter.next().unwrap(); + + let distance = match distance_arg.cast_to(&DataType::Float64, None)? { + ColumnarValue::Scalar(ScalarValue::Float64(Some(d))) => d, + _ => { + return Err(DataFusionError::NotImplemented( + "Vectorized distance not supported for ST_DWithin".to_string(), + )); + } + }; + + Ok(dwithin_impl( + left, + &args.arg_fields[0], + right, + &args.arg_fields[1], + distance, + )?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(DOCUMENTATION.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometries are within a given distance of one another. The distance is specified in units of the spatial reference system.", + "ST_DWithin(geomA, geomB, distance)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .with_argument("distance", "float8 - distance threshold") + .build() + })) + } +} + +fn dwithin_impl( + left: ColumnarValue, + left_field: &Field, + right: ColumnarValue, + right_field: &Field, + distance: f64, +) -> GeoDataFusionResult { + match (left, right) { + (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => { + let mut arrays = + ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)])? + .into_iter(); + dwithin_impl( + ColumnarValue::Array(arrays.next().unwrap()), + left_field, + ColumnarValue::Array(arrays.next().unwrap()), + right_field, + distance, + ) + } + (ColumnarValue::Array(l_arr), ColumnarValue::Array(r_arr)) => { + let left_geo = from_arrow_array(&l_arr, left_field)?; + let right_geo = from_arrow_array(&r_arr, right_field)?; + let result = dwithin_arrays(&left_geo, &right_geo, distance)?; + Ok(ColumnarValue::Array(std::sync::Arc::new(result))) + } + (ColumnarValue::Scalar(l), ColumnarValue::Array(r_arr)) => { + let l_arrays = ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(l)])?; + let left_geo = from_arrow_array(&l_arrays[0], left_field)?; + let right_geo = from_arrow_array(&r_arr, right_field)?; + let result = dwithin_broadcast_left(&left_geo, &right_geo, distance)?; + Ok(ColumnarValue::Array(std::sync::Arc::new(result))) + } + (ColumnarValue::Array(l_arr), ColumnarValue::Scalar(r)) => { + let r_arrays = ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(r)])?; + let left_geo = from_arrow_array(&l_arr, left_field)?; + let right_geo = from_arrow_array(&r_arrays[0], right_field)?; + let result = dwithin_broadcast_right(&left_geo, &right_geo, distance)?; + Ok(ColumnarValue::Array(std::sync::Arc::new(result))) + } + } +} + +fn dwithin_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _dwithin_left, right, distance) +} + +fn _dwithin_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _dwithin_both, left, distance) +} + +fn _dwithin_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, + distance: f64, +) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(left.len()); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + builder.append_value(left_geo.euclidean_distance(&right_geo) <= distance); + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +fn dwithin_broadcast_left( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _dwithin_bl_left, right, distance) +} + +fn _dwithin_bl_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + let left_geom = left + .iter() + .next() + .unwrap() + .map(|g| geometry_to_geo(&g?)) + .transpose()?; + + downcast_geoarrow_array!(right, _dwithin_bl_right, &left_geom, distance) +} + +fn _dwithin_bl_right<'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left_geom: &Option, + distance: f64, +) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(right.len()); + if let Some(lg) = left_geom { + for r in right.iter() { + if let Some(rg) = r { + let right_geo = geometry_to_geo(&rg?)?; + builder.append_value(lg.euclidean_distance(&right_geo) <= distance); + } else { + builder.append_null(); + } + } + } else { + for _ in 0..right.len() { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn dwithin_broadcast_right( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _dwithin_br_right, left, distance) +} + +fn _dwithin_br_right<'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &dyn GeoArrowArray, + distance: f64, +) -> GeoArrowResult { + let right_geom = right + .iter() + .next() + .unwrap() + .map(|g| geometry_to_geo(&g?)) + .transpose()?; + + downcast_geoarrow_array!(left, _dwithin_br_left, &right_geom, distance) +} + +fn _dwithin_br_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right_geom: &Option, + distance: f64, +) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(left.len()); + if let Some(rg) = right_geom { + for l in left.iter() { + if let Some(lg) = l { + let left_geo = geometry_to_geo(&lg?)?; + builder.append_value(left_geo.euclidean_distance(rg) <= distance); + } else { + builder.append_null(); + } + } + } else { + for _ in 0..left.len() { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray; + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_dwithin_true() { + let ctx = SessionContext::new(); + ctx.register_udf(DWithin::new().into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_DWithin(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(1 0)'), 1.5);") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_dwithin_false() { + let ctx = SessionContext::new(); + ctx.register_udf(DWithin::new().into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_DWithin(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(10 0)'), 1.5);") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(!batch.column(0).as_boolean().value(0)); + } +} diff --git a/rust/geodatafusion/src/udf/geo/relationships/mod.rs b/rust/geodatafusion/src/udf/geo/relationships/mod.rs index cea1af9..77c7806 100644 --- a/rust/geodatafusion/src/udf/geo/relationships/mod.rs +++ b/rust/geodatafusion/src/udf/geo/relationships/mod.rs @@ -1,5 +1,9 @@ +mod dwithin; +mod relate; mod topological; +pub use dwithin::DWithin; +pub use relate::{StRelate, StRelateMatch}; pub use topological::{ Contains, CoveredBy, Covers, Crosses, Disjoint, Equals, Intersects, Overlaps, Touches, Within, }; @@ -10,9 +14,12 @@ pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(Covers::default().into()); session_context.register_udf(Crosses::default().into()); session_context.register_udf(Disjoint::default().into()); + session_context.register_udf(DWithin::default().into()); session_context.register_udf(Equals::default().into()); session_context.register_udf(Intersects::default().into()); session_context.register_udf(Overlaps::default().into()); + session_context.register_udf(StRelate::default().into()); + session_context.register_udf(StRelateMatch::default().into()); session_context.register_udf(Touches::default().into()); session_context.register_udf(Within::default().into()); } diff --git a/rust/geodatafusion/src/udf/geo/relationships/relate.rs b/rust/geodatafusion/src/udf/geo/relationships/relate.rs new file mode 100644 index 0000000..8931387 --- /dev/null +++ b/rust/geodatafusion/src/udf/geo/relationships/relate.rs @@ -0,0 +1,386 @@ +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::{Array, StringArray}; +use arrow_array::builder::StringBuilder; +use arrow_schema::{DataType, Field}; +use datafusion::error::Result; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use geo::Relate as GeoRelate; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; + +use crate::error::GeoDataFusionResult; + +/// ST_Relate — returns the DE-9IM intersection matrix string for two geometries. +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct StRelate { + signature: Signature, +} + +impl StRelate { + pub fn new() -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + } + } +} + +impl Default for StRelate { + fn default() -> Self { + Self::new() + } +} + +static RELATE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for StRelate { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_relate" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let mut arg_iter = args.args.into_iter(); + let left = arg_iter.next().unwrap(); + let right = arg_iter.next().unwrap(); + Ok(relate_string_impl(left, &args.arg_fields[0], right, &args.arg_fields[1])?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(RELATE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the DE-9IM (Dimensionally Extended 9-Intersection Model) matrix string representing the spatial relationship between two geometries.", + "ST_Relate(geomA, geomB)", + ) + .with_argument("geomA", "geometry") + .with_argument("geomB", "geometry") + .build() + })) + } +} + +fn relate_string_impl( + left: ColumnarValue, + left_field: &Field, + right: ColumnarValue, + right_field: &Field, +) -> GeoDataFusionResult { + match (left, right) { + (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => { + let mut arrays = + ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)])? + .into_iter(); + relate_string_impl( + ColumnarValue::Array(arrays.next().unwrap()), + left_field, + ColumnarValue::Array(arrays.next().unwrap()), + right_field, + ) + } + (ColumnarValue::Array(l), ColumnarValue::Array(r)) => { + let left_geo = from_arrow_array(&l, left_field)?; + let right_geo = from_arrow_array(&r, right_field)?; + let result = relate_arrays(&left_geo, &right_geo)?; + Ok(ColumnarValue::Array(Arc::new(result))) + } + (ColumnarValue::Scalar(l), ColumnarValue::Array(r_arr)) => { + let l_arrays = ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(l)])?; + let left_geo = from_arrow_array(&l_arrays[0], left_field)?; + let right_geo = from_arrow_array(&r_arr, right_field)?; + let result = relate_broadcast_left(&left_geo, &right_geo)?; + Ok(ColumnarValue::Array(Arc::new(result))) + } + (ColumnarValue::Array(l_arr), ColumnarValue::Scalar(r)) => { + let r_arrays = ColumnarValue::values_to_arrays(&[ColumnarValue::Scalar(r)])?; + let left_geo = from_arrow_array(&l_arr, left_field)?; + let right_geo = from_arrow_array(&r_arrays[0], right_field)?; + let result = relate_broadcast_right(&left_geo, &right_geo)?; + Ok(ColumnarValue::Array(Arc::new(result))) + } + } +} + +fn relate_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _relate_left, right) +} + +fn _relate_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _relate_both, left) +} + +fn _relate_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = StringBuilder::with_capacity(left.len(), left.len() * 9); + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(lg), Some(rg)) => { + let left_geo = geometry_to_geo(&lg?)?; + let right_geo = geometry_to_geo(&rg?)?; + let matrix = left_geo.relate(&right_geo); + builder.append_value(intersection_matrix_to_string(&matrix)); + } + _ => builder.append_null(), + } + } + Ok(builder.finish()) +} + +/// ST_RelateMatch — tests if a DE-9IM matrix matches a given pattern. +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct StRelateMatch { + signature: Signature, +} + +impl StRelateMatch { + pub fn new() -> Self { + Self { + signature: Signature::exact( + vec![DataType::Utf8, DataType::Utf8], + Volatility::Immutable, + ), + } + } +} + +impl Default for StRelateMatch { + fn default() -> Self { + Self::new() + } +} + +static RELATE_MATCH_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for StRelateMatch { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_relatematch" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let matrices = arrays[0].as_any().downcast_ref::().unwrap(); + let patterns = arrays[1].as_any().downcast_ref::().unwrap(); + + let mut builder = arrow_array::builder::BooleanBuilder::with_capacity(matrices.len()); + + for i in 0..matrices.len() { + if matrices.is_null(i) || patterns.is_null(i) { + builder.append_null(); + } else { + let matrix_str = matrices.value(i); + let pattern = patterns.value(i); + let matches = de9im_matches(matrix_str, pattern); + builder.append_value(matches); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(RELATE_MATCH_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Tests if a DE-9IM intersection matrix matches a given pattern string.", + "ST_RelateMatch(matrix, pattern)", + ) + .with_argument("matrix", "text - a DE-9IM matrix string (9 characters)") + .with_argument("pattern", "text - a DE-9IM pattern string") + .build() + })) + } +} + +fn relate_broadcast_left( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(left, _relate_bl_left, right) +} + +fn _relate_bl_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + let left_geom = left + .iter() + .next() + .unwrap() + .map(|g| geometry_to_geo(&g?)) + .transpose()?; + downcast_geoarrow_array!(right, _relate_bl_right, &left_geom) +} + +fn _relate_bl_right<'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left_geom: &Option, +) -> GeoArrowResult { + let mut builder = StringBuilder::with_capacity(right.len(), right.len() * 9); + if let Some(lg) = left_geom { + for r in right.iter() { + if let Some(rg) = r { + let right_geo = geometry_to_geo(&rg?)?; + let matrix = lg.relate(&right_geo); + builder.append_value(intersection_matrix_to_string(&matrix)); + } else { + builder.append_null(); + } + } + } else { + for _ in 0..right.len() { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn relate_broadcast_right( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(right, _relate_br_right, left) +} + +fn _relate_br_right<'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &dyn GeoArrowArray, +) -> GeoArrowResult { + let right_geom = right + .iter() + .next() + .unwrap() + .map(|g| geometry_to_geo(&g?)) + .transpose()?; + downcast_geoarrow_array!(left, _relate_br_left, &right_geom) +} + +fn _relate_br_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right_geom: &Option, +) -> GeoArrowResult { + let mut builder = StringBuilder::with_capacity(left.len(), left.len() * 9); + if let Some(rg) = right_geom { + for l in left.iter() { + if let Some(lg) = l { + let left_geo = geometry_to_geo(&lg?)?; + let matrix = left_geo.relate(rg); + builder.append_value(intersection_matrix_to_string(&matrix)); + } else { + builder.append_null(); + } + } + } else { + for _ in 0..left.len() { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +/// Convert an `IntersectionMatrix` to its 9-character DE-9IM string using the +/// public `get` API, avoiding any reliance on the `Debug` format. +fn intersection_matrix_to_string(matrix: &geo::relate::IntersectionMatrix) -> String { + use geo::coordinate_position::CoordPos; + use geo::dimensions::Dimensions; + let mut s = String::with_capacity(9); + for a in [CoordPos::Inside, CoordPos::OnBoundary, CoordPos::Outside] { + for b in [CoordPos::Inside, CoordPos::OnBoundary, CoordPos::Outside] { + s.push(match matrix.get(a, b) { + Dimensions::Empty => 'F', + Dimensions::ZeroDimensional => '0', + Dimensions::OneDimensional => '1', + Dimensions::TwoDimensional => '2', + }); + } + } + s +} + +/// Check if a DE-9IM matrix string matches a pattern. +fn de9im_matches(matrix: &str, pattern: &str) -> bool { + if matrix.len() != 9 || pattern.len() != 9 { + return false; + } + + matrix + .chars() + .zip(pattern.chars()) + .all(|(m, p)| match p { + '*' => true, + 'T' | 't' => m != 'F' && m != 'f', + 'F' | 'f' => m == 'F' || m == 'f', + '0' => m == '0', + '1' => m == '1', + '2' => m == '2', + _ => false, + }) +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray; + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_relate() { + let ctx = SessionContext::new(); + ctx.register_udf(StRelate::new().into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_Relate(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('LINESTRING(0 0, 1 1)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + let val = batch.column(0).as_string::().value(0); + // Point on start of line: F0FFFF102 + assert_eq!(val.len(), 9); + } + + #[test] + fn test_de9im_matches() { + assert!(de9im_matches("212101212", "T*T***T**")); + assert!(!de9im_matches("FF*FF****", "T*T***T**")); + assert!(de9im_matches("FF*FF****", "FF*FF****")); + } +} diff --git a/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs b/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs new file mode 100644 index 0000000..822639d --- /dev/null +++ b/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs @@ -0,0 +1,963 @@ +//! Additional geometry accessor functions: ST_IsEmpty, ST_IsClosed, ST_IsSimple, +//! ST_IsRing, ST_IsCollection, ST_Dimension, ST_NumGeometries, ST_HasZ, ST_HasM, +//! ST_NRings. + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::{BooleanBuilder, Int32Builder, UInt32Builder}; +use arrow_array::{ArrayRef, BooleanArray, Int32Array, UInt32Array}; +use arrow_schema::DataType; +use datafusion::error::Result; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, +}; +use geo_traits::*; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::Dimension; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// ========================================================================= +// ST_IsEmpty +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsEmpty; + +impl Default for IsEmpty { + fn default() -> Self { + Self + } +} + +static IS_EMPTY_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsEmpty { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isempty" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_empty_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_EMPTY_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is an empty geometry (has no points).", + "ST_IsEmpty(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_empty_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_empty_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_empty_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_empty_impl) +} + +fn _is_empty_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let empty = match g.as_type() { + GeometryType::Point(p) => p.coord().is_none(), + GeometryType::LineString(ls) => ls.num_coords() == 0, + GeometryType::Polygon(p) => { + p.exterior().map_or(true, |ext| ext.num_coords() == 0) + } + GeometryType::MultiPoint(mp) => mp.num_points() == 0, + GeometryType::MultiLineString(mls) => mls.num_line_strings() == 0, + GeometryType::MultiPolygon(mp) => mp.num_polygons() == 0, + GeometryType::GeometryCollection(gc) => gc.num_geometries() == 0, + _ => false, + }; + builder.append_value(empty); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_IsClosed +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsClosed; + +impl Default for IsClosed { + fn default() -> Self { + Self + } +} + +static IS_CLOSED_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsClosed { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isclosed" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_closed_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_CLOSED_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry's start and end points are coincident.", + "ST_IsClosed(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_closed_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_closed_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_closed_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_closed_impl) +} + +fn _is_closed_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let closed = is_geo_closed(&geo_geom); + builder.append_value(closed); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn is_geo_closed(geom: &geo::Geometry) -> bool { + match geom { + geo::Geometry::Point(_) => true, + geo::Geometry::LineString(ls) => is_linestring_closed(ls), + geo::Geometry::Polygon(_) => true, + geo::Geometry::MultiLineString(mls) => mls.0.iter().all(is_linestring_closed), + _ => true, + } +} + +fn is_linestring_closed(ls: &geo::LineString) -> bool { + if ls.0.len() < 2 { + return false; + } + let first = ls.0.first().unwrap(); + let last = ls.0.last().unwrap(); + first.x == last.x && first.y == last.y +} + +// ========================================================================= +// ST_IsSimple +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsSimple; + +impl Default for IsSimple { + fn default() -> Self { + Self + } +} + +static IS_SIMPLE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsSimple { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_issimple" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_simple_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_SIMPLE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has no anomalous geometric points.", + "ST_IsSimple(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_simple_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_simple_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_simple_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_simple_impl) +} + +fn _is_simple_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let simple = is_geometry_simple(&geo_geom); + builder.append_value(simple); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn is_geometry_simple(geom: &geo::Geometry) -> bool { + match geom { + geo::Geometry::Point(_) => true, + geo::Geometry::MultiPoint(mp) => { + // Simple if no two points are equal + for i in 0..mp.0.len() { + for j in (i + 1)..mp.0.len() { + if mp.0[i] == mp.0[j] { + return false; + } + } + } + true + } + geo::Geometry::LineString(ls) => is_linestring_simple(ls), + geo::Geometry::MultiLineString(mls) => mls.0.iter().all(is_linestring_simple), + geo::Geometry::Polygon(_) | geo::Geometry::MultiPolygon(_) => true, + geo::Geometry::GeometryCollection(gc) => gc.0.iter().all(is_geometry_simple), + _ => true, + } +} + +fn is_linestring_simple(ls: &geo::LineString) -> bool { + use geo::line_intersection::{line_intersection, LineIntersection}; + + if ls.0.len() < 2 { + return true; + } + let lines: Vec = ls.lines().collect(); + for i in 0..lines.len() { + // Check against non-adjacent segments + for j in (i + 2)..lines.len() { + // Skip adjacent segments (i and i+1 share an endpoint) + // Also skip the pair of first and last segment if the linestring is closed + // (they share the start/end point by definition) + if i == 0 && j == lines.len() - 1 && is_linestring_closed(ls) { + continue; + } + if let Some(intersection) = line_intersection(lines[i], lines[j]) { + match intersection { + LineIntersection::SinglePoint { intersection, .. } => { + // Shared endpoint between non-adjacent segments is OK + // only if it's at the ends of both segments + let is_endpoint_i = + intersection == lines[i].start || intersection == lines[i].end; + let is_endpoint_j = + intersection == lines[j].start || intersection == lines[j].end; + if !(is_endpoint_i && is_endpoint_j) { + return false; + } + } + LineIntersection::Collinear { .. } => return false, + } + } + } + } + true +} + +// ========================================================================= +// ST_IsRing +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsRing; + +impl Default for IsRing { + fn default() -> Self { + Self + } +} + +static IS_RING_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsRing { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isring" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_ring_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_RING_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is a closed and simple linestring (a ring).", + "ST_IsRing(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_ring_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_ring_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_ring_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_ring_impl) +} + +fn _is_ring_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let ring = match &geo_geom { + geo::Geometry::LineString(ls) => { + ls.0.len() >= 4 && is_linestring_closed(ls) && is_linestring_simple(ls) + } + _ => false, + }; + builder.append_value(ring); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_IsCollection +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsCollection; + +impl Default for IsCollection { + fn default() -> Self { + Self + } +} + +static IS_COLLECTION_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsCollection { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_iscollection" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_collection_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_COLLECTION_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is a collection type.", + "ST_IsCollection(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_collection_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_collection_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_collection_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_collection_impl) +} + +fn _is_collection_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let is_coll = matches!( + g.as_type(), + GeometryType::MultiPoint(_) + | GeometryType::MultiLineString(_) + | GeometryType::MultiPolygon(_) + | GeometryType::GeometryCollection(_) + ); + builder.append_value(is_coll); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_Dimension +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct StDimension; + +impl Default for StDimension { + fn default() -> Self { + Self + } +} + +static DIMENSION_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for StDimension { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_dimension" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(dimension_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(DIMENSION_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the topological dimension of the geometry.", + "ST_Dimension(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn dimension_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = dimension_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn dimension_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _dimension_impl) +} + +fn _dimension_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = Int32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let dim = match g.as_type() { + GeometryType::Point(_) | GeometryType::MultiPoint(_) => 0, + GeometryType::LineString(_) + | GeometryType::MultiLineString(_) + | GeometryType::Line(_) => 1, + GeometryType::Polygon(_) + | GeometryType::MultiPolygon(_) + | GeometryType::Triangle(_) + | GeometryType::Rect(_) => 2, + GeometryType::GeometryCollection(gc) => { + let mut max_dim = -1i32; + for g in gc.geometries() { + let d = match g.as_type() { + GeometryType::Point(_) | GeometryType::MultiPoint(_) => 0, + GeometryType::LineString(_) + | GeometryType::MultiLineString(_) => 1, + GeometryType::Polygon(_) + | GeometryType::MultiPolygon(_) => 2, + _ => 0, + }; + if d > max_dim { + max_dim = d; + } + } + max_dim + } + }; + builder.append_value(dim); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_NumGeometries +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct NumGeometries; + +impl Default for NumGeometries { + fn default() -> Self { + Self + } +} + +static NUM_GEOMETRIES_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for NumGeometries { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_numgeometries" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::UInt32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(num_geometries_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(NUM_GEOMETRIES_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the number of elements in a geometry collection.", + "ST_NumGeometries(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn num_geometries_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = num_geometries_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn num_geometries_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _num_geometries_impl) +} + +fn _num_geometries_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = UInt32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let count = match g.as_type() { + GeometryType::Point(_) + | GeometryType::LineString(_) + | GeometryType::Polygon(_) => 1u32, + GeometryType::MultiPoint(mp) => mp.num_points() as u32, + GeometryType::MultiLineString(mls) => mls.num_line_strings() as u32, + GeometryType::MultiPolygon(mp) => mp.num_polygons() as u32, + GeometryType::GeometryCollection(gc) => gc.num_geometries() as u32, + _ => 1, + }; + builder.append_value(count); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_NRings +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct NRings; + +impl Default for NRings { + fn default() -> Self { + Self + } +} + +static NRINGS_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for NRings { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_nrings" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::UInt32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(nrings_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(NRINGS_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the total number of rings in a geometry.", + "ST_NRings(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn nrings_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = nrings_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn nrings_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _nrings_impl) +} + +fn _nrings_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = UInt32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let count = count_rings(&g); + builder.append_value(count); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn count_rings(geom: &impl GeometryTrait) -> u32 { + match geom.as_type() { + GeometryType::Polygon(p) => { + let exterior = if p.exterior().is_some() { 1u32 } else { 0 }; + exterior + p.num_interiors() as u32 + } + GeometryType::MultiPolygon(mp) => { + (0..mp.num_polygons()) + .filter_map(|i| mp.polygon(i)) + .map(|p| count_rings_polygon(&p)) + .sum() + } + GeometryType::GeometryCollection(gc) => gc.geometries().map(|g| count_rings(&g)).sum(), + _ => 0, + } +} + +fn count_rings_polygon(p: &impl PolygonTrait) -> u32 { + let exterior = if p.exterior().is_some() { 1u32 } else { 0 }; + exterior + p.num_interiors() as u32 +} + +// ========================================================================= +// ST_HasZ +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct HasZ; + +impl Default for HasZ { + fn default() -> Self { + Self + } +} + +static HASZ_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for HasZ { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_hasz" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0]) + .map_err(|e| datafusion::error::DataFusionError::Internal(e.to_string()))?; + let dim = geo_array.data_type().dimension(); + let has_z = matches!(dim, Some(Dimension::XYZ) | Some(Dimension::XYZM)); + let result = BooleanArray::from(vec![Some(has_z); arrays[0].len()]); + Ok(ColumnarValue::Array(Arc::new(result))) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(HASZ_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has a Z coordinate dimension.", + "ST_HasZ(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +// ========================================================================= +// ST_HasM +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct HasM; + +impl Default for HasM { + fn default() -> Self { + Self + } +} + +static HASM_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for HasM { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_hasm" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0]) + .map_err(|e| datafusion::error::DataFusionError::Internal(e.to_string()))?; + let dim = geo_array.data_type().dimension(); + let has_m = matches!(dim, Some(Dimension::XYM) | Some(Dimension::XYZM)); + let result = BooleanArray::from(vec![Some(has_m); arrays[0].len()]); + Ok(ColumnarValue::Array(Arc::new(result))) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(HASM_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has an M (measure) coordinate dimension.", + "ST_HasM(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray; + use arrow_array::types::{Int32Type, UInt32Type}; + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_is_empty_point() { + let ctx = SessionContext::new(); + ctx.register_udf(IsEmpty.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsEmpty(ST_GeomFromText('POINT(0 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(!batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_is_closed_ring() { + let ctx = SessionContext::new(); + ctx.register_udf(IsClosed.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsClosed(ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0, 0 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_is_closed_open() { + let ctx = SessionContext::new(); + ctx.register_udf(IsClosed.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsClosed(ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(!batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_dimension() { + let ctx = SessionContext::new(); + ctx.register_udf(StDimension.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_Dimension(ST_GeomFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert_eq!(batch.column(0).as_primitive::().value(0), 2); + } + + #[tokio::test] + async fn test_num_geometries_multi() { + let ctx = SessionContext::new(); + ctx.register_udf(NumGeometries.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_NumGeometries(ST_GeomFromText('MULTIPOINT(0 0, 1 1, 2 2)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert_eq!(batch.column(0).as_primitive::().value(0), 3); + } +} diff --git a/rust/geodatafusion/src/udf/native/accessors/mod.rs b/rust/geodatafusion/src/udf/native/accessors/mod.rs index 5a716ca..6532a63 100644 --- a/rust/geodatafusion/src/udf/native/accessors/mod.rs +++ b/rust/geodatafusion/src/udf/native/accessors/mod.rs @@ -1,4 +1,5 @@ mod coord_dim; +mod geometry_accessors; mod geometry_type; mod line_string; mod npoints; @@ -6,6 +7,10 @@ mod num_interior_rings; mod point; pub use coord_dim::{CoordDim, NDims}; +pub use geometry_accessors::{ + HasM, HasZ, IsClosed, IsCollection, IsEmpty, IsRing, IsSimple, NRings, NumGeometries, + StDimension, +}; pub use geometry_type::{GeometryType, ST_GeometryType}; pub use line_string::{EndPoint, StartPoint}; pub use npoints::NPoints; @@ -14,13 +19,23 @@ pub use point::{M, X, Y, Z}; pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(CoordDim.into()); + session_context.register_udf(StDimension.into()); session_context.register_udf(NDims.into()); session_context.register_udf(GeometryType.into()); session_context.register_udf(ST_GeometryType.into()); session_context.register_udf(EndPoint::default().into()); - session_context.register_udf(StartPoint::default().into()); + session_context.register_udf(HasM.into()); + session_context.register_udf(HasZ.into()); + session_context.register_udf(IsClosed.into()); + session_context.register_udf(IsCollection.into()); + session_context.register_udf(IsEmpty.into()); + session_context.register_udf(IsRing.into()); + session_context.register_udf(IsSimple.into()); session_context.register_udf(NPoints.into()); + session_context.register_udf(NRings.into()); + session_context.register_udf(NumGeometries.into()); session_context.register_udf(NumInteriorRings.into()); + session_context.register_udf(StartPoint::default().into()); session_context.register_udf(M::default().into()); session_context.register_udf(X::default().into()); session_context.register_udf(Y::default().into()); diff --git a/rust/geodatafusion/src/udf/native/constructors/make.rs b/rust/geodatafusion/src/udf/native/constructors/make.rs new file mode 100644 index 0000000..54a5208 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/constructors/make.rs @@ -0,0 +1,584 @@ +//! Line, Polygon, and Envelope constructors + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::cast::AsArray; +use arrow_array::types::Float64Type; +use arrow_array::Array; +use arrow_schema::{DataType, FieldRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::builder::GeometryBuilder; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::{CoordType, GeometryType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_MakeLine +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakeLine { + signature: Signature, + coord_type: CoordType, +} + +impl MakeLine { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + coord_type, + } + } +} + +impl Default for MakeLine { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_LINE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakeLine { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makeline" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let metadata = + Arc::new(Metadata::try_from(args.arg_fields[0].as_ref()).unwrap_or_default()); + let output_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(Arc::new(output_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_line_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_LINE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a LineString containing the points of two input geometries. Other geometries are decomposed into their component points.", + "ST_MakeLine(geom1, geom2)", + ) + .with_argument("geom1", "geometry") + .with_argument("geom2", "geometry") + .build() + })) + } +} + +fn make_line_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_array = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = make_line_arrays(&left_array, &right_array, coord_type)?; + Ok(ColumnarValue::Array(result.into_array_ref())) +} + +fn make_line_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(left, _make_line_left, right, coord_type) +} + +fn _make_line_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(right, _make_line_both, left, coord_type) +} + +fn _make_line_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, + coord_type: CoordType, +) -> GeoArrowResult> { + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(left_geom), Some(right_geom)) => { + let left_geo = geometry_to_geo(&left_geom?)?; + let right_geo = geometry_to_geo(&right_geom?)?; + + let mut coords = collect_points(&left_geo); + coords.extend(collect_points(&right_geo)); + + if coords.len() >= 2 { + let line = geo::LineString::new(coords); + builder.push_geometry(Some(&geo::Geometry::LineString(line)))?; + } else { + builder.push_null(); + } + } + _ => builder.push_null(), + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Extract all coordinate points from a geometry. +fn collect_points(geom: &geo::Geometry) -> Vec { + use geo::CoordsIter; + geom.coords_iter().collect() +} + +// --------------------------------------------------------------------------- +// ST_MakePolygon +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakePolygon { + coord_type: CoordType, +} + +impl MakePolygon { + pub fn new(coord_type: CoordType) -> Self { + Self { coord_type } + } +} + +impl Default for MakePolygon { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_POLYGON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakePolygon { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makepolygon" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let metadata = + Arc::new(Metadata::try_from(args.arg_fields[0].as_ref()).unwrap_or_default()); + let output_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(Arc::new(output_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_polygon_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_POLYGON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a Polygon from a closed LineString shell.", + "ST_MakePolygon(linestring)", + ) + .with_argument("shell", "A closed LineString geometry") + .build() + })) + } +} + +fn make_polygon_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = make_polygon_array(&geo_array, coord_type)?; + Ok(ColumnarValue::Array(result.into_array_ref())) +} + +fn make_polygon_array( + array: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(array, _make_polygon_impl, coord_type) +} + +fn _make_polygon_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, + coord_type: CoordType, +) -> GeoArrowResult> { + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + match geo_geom { + geo::Geometry::LineString(mut ls) => { + // Auto-close if not already closed (matches PostGIS behavior) + if ls.0.len() >= 2 && ls.0.first() != ls.0.last() { + ls.0.push(ls.0[0]); + } + let polygon = geo::Polygon::new(ls, vec![]); + builder.push_geometry(Some(&geo::Geometry::Polygon(polygon)))?; + } + _ => { + // For non-linestring inputs, push null + builder.push_null(); + } + } + } else { + builder.push_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +// --------------------------------------------------------------------------- +// ST_MakeEnvelope +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakeEnvelope { + signature: Signature, + coord_type: CoordType, +} + +impl MakeEnvelope { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![ + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Float64, + ]), + TypeSignature::Exact(vec![ + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Int64, + ]), + ], + Volatility::Immutable, + ), + coord_type, + } + } +} + +impl Default for MakeEnvelope { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_ENVELOPE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakeEnvelope { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makeenvelope" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let mut geom_type = + GeometryType::new(Default::default()).with_coord_type(self.coord_type); + + // If a 5th argument (SRID) is provided as scalar, use it + if let Some(srid) = args.scalar_arguments.get(4) { + if let Some(datafusion::scalar::ScalarValue::Int64(srid_val)) = srid { + let crs = geoarrow_schema::Crs::from_authority_code(format!( + "EPSG:{}", + match srid_val { + Some(v) => v, + None => return Ok(Arc::new(geom_type.to_field("", true))), + } + )); + geom_type = geom_type + .with_metadata(Arc::new(Metadata::new(crs, Default::default()))); + } + } + + Ok(Arc::new(geom_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_envelope_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_ENVELOPE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a rectangular Polygon from minimum and maximum coordinate values. The polygon is formed in the coordinate plane with corners at (xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin).", + "ST_MakeEnvelope(xmin, ymin, xmax, ymax) or ST_MakeEnvelope(xmin, ymin, xmax, ymax, srid)", + ) + .with_argument("xmin", "minimum x value") + .with_argument("ymin", "minimum y value") + .with_argument("xmax", "maximum x value") + .with_argument("ymax", "maximum y value") + .with_argument("srid", "optional integer SRID value") + .build() + })) + } +} + +fn make_envelope_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args[..4])?; + let xmin = arrays[0].as_primitive::(); + let ymin = arrays[1].as_primitive::(); + let xmax = arrays[2].as_primitive::(); + let ymax = arrays[3].as_primitive::(); + + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for i in 0..xmin.len() { + if xmin.is_null(i) || ymin.is_null(i) || xmax.is_null(i) || ymax.is_null(i) { + builder.push_null(); + } else { + let x1 = xmin.value(i); + let y1 = ymin.value(i); + let x2 = xmax.value(i); + let y2 = ymax.value(i); + + let polygon = geo::Polygon::new( + geo::LineString::from(vec![ + (x1, y1), + (x1, y2), + (x2, y2), + (x2, y1), + (x1, y1), + ]), + vec![], + ); + builder.push_geometry(Some(&geo::Geometry::Polygon(polygon)))?; + } + } + + Ok(ColumnarValue::Array(builder.finish().to_array_ref())) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Float64Array, RecordBatch}; + use arrow_schema::{Field, Schema}; + use datafusion::prelude::SessionContext; + use geoarrow_schema::Crs; + + use super::*; + use crate::udf::native::io::{AsText, GeomFromText}; + + #[tokio::test] + async fn test_make_line() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeLine::default().into()); + ctx.register_udf(GeomFromText::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakeLine(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(1 1)')));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("LINESTRING"), + "Expected LINESTRING in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_polygon() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakePolygon::default().into()); + ctx.register_udf(GeomFromText::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakePolygon(ST_GeomFromText('LINESTRING(0 0, 10 0, 10 10, 0 10, 0 0)')));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_envelope() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakeEnvelope(0.0, 0.0, 1.0, 1.0));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_envelope_with_srid() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_MakeEnvelope(0.0, 0.0, 1.0, 1.0, 4326);"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let output_schema = output_batch.schema(); + let output_field = output_schema.field(0); + + let geom_type = output_field.try_extension_type::().unwrap(); + assert_eq!( + geom_type.metadata().crs(), + &Crs::from_authority_code("EPSG:4326".to_string()) + ); + } + + #[tokio::test] + async fn test_make_envelope_from_table() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + ctx.register_udf(AsText.into()); + + let xmin: ArrayRef = Arc::new(Float64Array::from(vec![0.0])); + let ymin: ArrayRef = Arc::new(Float64Array::from(vec![0.0])); + let xmax: ArrayRef = Arc::new(Float64Array::from(vec![10.0])); + let ymax: ArrayRef = Arc::new(Float64Array::from(vec![10.0])); + + let schema = Schema::new([ + Arc::new(Field::new("xmin", DataType::Float64, true)), + Arc::new(Field::new("ymin", DataType::Float64, true)), + Arc::new(Field::new("xmax", DataType::Float64, true)), + Arc::new(Field::new("ymax", DataType::Float64, true)), + ]); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![xmin, ymin, xmax, ymax]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + let sql_df = ctx + .sql("SELECT ST_AsText(ST_MakeEnvelope(xmin, ymin, xmax, ymax)) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } +} diff --git a/rust/geodatafusion/src/udf/native/constructors/mod.rs b/rust/geodatafusion/src/udf/native/constructors/mod.rs index 18438bb..d153f22 100644 --- a/rust/geodatafusion/src/udf/native/constructors/mod.rs +++ b/rust/geodatafusion/src/udf/native/constructors/mod.rs @@ -1,5 +1,7 @@ +mod make; mod point; +pub use make::{MakeEnvelope, MakeLine, MakePolygon}; pub use point::{MakePoint, MakePointM, Point, PointM, PointZ, PointZM}; pub fn register(session_context: &datafusion::prelude::SessionContext) { @@ -9,4 +11,7 @@ pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(PointM::default().into()); session_context.register_udf(PointZ::default().into()); session_context.register_udf(PointZM::default().into()); + session_context.register_udf(MakeLine::default().into()); + session_context.register_udf(MakePolygon::default().into()); + session_context.register_udf(MakeEnvelope::default().into()); } diff --git a/rust/geodatafusion/src/udf/native/io/ewkt.rs b/rust/geodatafusion/src/udf/native/io/ewkt.rs new file mode 100644 index 0000000..735e498 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/io/ewkt.rs @@ -0,0 +1,232 @@ +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::StringBuilder; +use arrow_array::Array; +use arrow_schema::{DataType, Field}; +use datafusion::error::Result; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::cast::to_wkt; +use geoarrow_array::GeoArrowArray; +use geoarrow_schema::{CrsType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_AsEWKT +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct AsEWKT; + +impl AsEWKT { + pub fn new() -> Self { + Self {} + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + let geo_array = from_arrow_array(&array, field.as_ref())?; + + // Get WKT representation + let wkt_arr = to_wkt::(geo_array.as_ref())?; + + // Extract SRID from metadata if present + let metadata = Metadata::try_from(field.as_ref()).ok(); + let srid_prefix = metadata + .as_ref() + .and_then(|m| { + let crs = m.crs(); + if crs.crs_type() == Some(CrsType::AuthorityCode) { + if let Some(val) = crs.crs_value() { + if let Some(code_str) = val.as_str() { + if let Some(srid) = code_str.strip_prefix("EPSG:") { + return Some(format!("SRID={};", srid)); + } + } + } + } + None + }) + .unwrap_or_default(); + + // Build output with SRID prefix + let str_arr = wkt_arr.into_array_ref(); + let wkt_string_arr = str_arr + .as_any() + .downcast_ref::() + .unwrap(); + + let mut builder = + StringBuilder::with_capacity(wkt_string_arr.len(), wkt_string_arr.len() * 64); + for i in 0..wkt_string_arr.len() { + if wkt_string_arr.is_null(i) { + builder.append_null(); + } else { + let wkt_val = wkt_string_arr.value(i); + let ewkt = format!("{}{}", srid_prefix, wkt_val); + builder.append_value(&ewkt); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } +} + +impl Default for AsEWKT { + fn default() -> Self { + Self::new() + } +} + +static AS_EWKT_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsEWKT { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_asewkt" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + Ok(Field::new(input_field.name(), DataType::Utf8, input_field.is_nullable()).into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_EWKT_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the Extended Well-Known Text (EWKT) representation of the geometry with SRID metadata.", + "ST_AsEWKT(geometry)", + ) + .with_argument("g1", "geometry") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::Schema; + use datafusion::prelude::SessionContext; + use geoarrow_array::test::point; + use geoarrow_schema::{CoordType, Crs, Dimension, Metadata}; + + use super::*; + + #[tokio::test] + async fn test_as_ewkt_with_srid() { + let ctx = SessionContext::new(); + + let crs = Crs::from_authority_code("EPSG:4326".to_string()); + let metadata = Arc::new(Metadata::new(crs, Default::default())); + + let geo_arr = point::array(CoordType::Separated, Dimension::XY).with_metadata(metadata); + + let arr = geo_arr.to_array_ref(); + let field = geo_arr.data_type().to_field("geometry", true); + let schema = Schema::new([Arc::new(field)]); + let batch = + arrow_array::RecordBatch::try_new(Arc::new(schema), vec![arr]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + ctx.register_udf(AsEWKT::new().into()); + + let sql_df = ctx + .sql("SELECT ST_AsEWKT(geometry) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(str_arr.len() > 0); + let first = str_arr.value(0); + assert!( + first.starts_with("SRID=4326;"), + "Expected EWKT to start with 'SRID=4326;', got: {}", + first + ); + assert!( + first.contains("POINT"), + "Expected EWKT to contain 'POINT', got: {}", + first + ); + } + + #[tokio::test] + async fn test_as_ewkt_without_srid() { + let ctx = SessionContext::new(); + + let geo_arr = point::array(CoordType::Separated, Dimension::XY); + + let arr = geo_arr.to_array_ref(); + let field = geo_arr.data_type().to_field("geometry", true); + let schema = Schema::new([Arc::new(field)]); + let batch = + arrow_array::RecordBatch::try_new(Arc::new(schema), vec![arr]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + ctx.register_udf(AsEWKT::new().into()); + + let sql_df = ctx + .sql("SELECT ST_AsEWKT(geometry) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(str_arr.len() > 0); + let first = str_arr.value(0); + assert!( + !first.starts_with("SRID="), + "Expected no SRID prefix when no CRS is set, got: {}", + first + ); + assert!( + first.contains("POINT"), + "Expected EWKT to contain 'POINT', got: {}", + first + ); + } +} diff --git a/rust/geodatafusion/src/udf/native/io/geojson.rs b/rust/geodatafusion/src/udf/native/io/geojson.rs new file mode 100644 index 0000000..ba749f0 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/io/geojson.rs @@ -0,0 +1,379 @@ +use std::any::Any; +use std::str::FromStr; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::StringBuilder; +use arrow_array::Array; +use arrow_schema::{DataType, Field}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::builder::GeometryBuilder; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::{CoordType, GeometryType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_AsGeoJSON +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct AsGeoJSON; + +impl AsGeoJSON { + pub fn new() -> Self { + Self {} + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + let geo_array = from_arrow_array(&array, field.as_ref())?; + let result = as_geojson_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result))) + } +} + +impl Default for AsGeoJSON { + fn default() -> Self { + Self::new() + } +} + +static AS_GEOJSON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsGeoJSON { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_asgeojson" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + Ok(Field::new(input_field.name(), DataType::Utf8, input_field.is_nullable()).into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_GEOJSON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the geometry as a GeoJSON element.", + "ST_AsGeoJSON(geometry)", + ) + .with_argument("g1", "geometry") + .build() + })) + } +} + +fn as_geojson_array( + array: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(array, _as_geojson_impl) +} + +fn _as_geojson_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 64); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let geojson_value = geojson::Value::from(&geo_geom); + let geojson_geom = geojson::Geometry::new(geojson_value); + let json_string = serde_json::to_string(&geojson_geom) + .map_err(|e| geoarrow_schema::error::GeoArrowError::InvalidGeoArrow(e.to_string()))?; + builder.append_value(&json_string); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_GeomFromGeoJSON +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct GeomFromGeoJSON { + signature: Signature, + coord_type: CoordType, +} + +impl GeomFromGeoJSON { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View], + Volatility::Immutable, + ), + coord_type, + } + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + + let string_arr = array.as_ref(); + let len = string_arr.len(); + + let geom_type = GeometryType::new(Default::default()).with_coord_type(self.coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + match field.data_type() { + DataType::Utf8 => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + DataType::LargeUtf8 => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + DataType::Utf8View => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + _ => { + return Err(DataFusionError::Internal(format!( + "Unexpected data type for ST_GeomFromGeoJSON: {:?}", + string_arr.data_type() + )) + .into()) + } + } + + Ok(ColumnarValue::Array(builder.finish().to_array_ref())) + } +} + +fn parse_geojson_to_geo(geojson_str: &str) -> GeoDataFusionResult { + use geojson::GeoJson; + + let geojson = GeoJson::from_str(geojson_str).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse GeoJSON: {}", e)) + })?; + + let geometry: geojson::Geometry = match geojson { + GeoJson::Geometry(g) => g, + GeoJson::Feature(f) => f.geometry.ok_or_else(|| { + DataFusionError::Execution("GeoJSON Feature has no geometry".to_string()) + })?, + GeoJson::FeatureCollection(_) => { + return Err(DataFusionError::Execution( + "GeoJSON FeatureCollection is not supported; pass a single geometry".to_string(), + ) + .into()); + } + }; + + let geo_geom: geo::Geometry = + geo::Geometry::try_from(geometry).map_err(|e| { + DataFusionError::Execution(format!("Failed to convert GeoJSON to geo type: {}", e)) + })?; + + Ok(geo_geom) +} + +impl Default for GeomFromGeoJSON { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static GEOM_FROM_GEOJSON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for GeomFromGeoJSON { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_geomfromgeojson" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + let metadata = Arc::new(Metadata::try_from(input_field.as_ref()).unwrap_or_default()); + let geom_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(geom_type + .to_field(input_field.name(), input_field.is_nullable()) + .into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(GEOM_FROM_GEOJSON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Takes a GeoJSON representation of a geometry and creates a geometry object.", + "ST_GeomFromGeoJSON(text)", + ) + .with_argument("geojson", "GeoJSON string") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_as_geojson() { + let ctx = SessionContext::new(); + + ctx.register_udf(AsGeoJSON::new().into()); + ctx.register_udf(GeomFromText::default().into()); + + // Use ST_GeomFromText to create geometry via SQL to avoid empty-point issues + // with the test helper arrays from geoarrow_array. + let sql_df = ctx + .sql("SELECT ST_AsGeoJSON(ST_GeomFromText('POINT(30 10)'));") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + // Verify output is Utf8 + let output_schema = output_batch.schema(); + let output_field = output_schema.field(0); + assert_eq!(output_field.data_type(), &DataType::Utf8); + + // Verify the GeoJSON contains expected structure + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(str_arr.len(), 1); + let first = str_arr.value(0); + assert!(first.contains("\"type\"")); + assert!(first.contains("\"coordinates\"")); + assert!(first.contains("30")); + assert!(first.contains("10")); + } + + #[tokio::test] + async fn test_geom_from_geojson() { + let ctx = SessionContext::new(); + + ctx.register_udf(GeomFromGeoJSON::new(CoordType::Separated).into()); + ctx.register_udf(AsGeoJSON::new().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsGeoJSON(ST_GeomFromGeoJSON('{"type":"Point","coordinates":[30,10]}'));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let result = str_arr.value(0); + assert!(result.contains("30")); + assert!(result.contains("10")); + } + + #[tokio::test] + async fn test_geom_from_geojson_polygon() { + let ctx = SessionContext::new(); + + ctx.register_udf(GeomFromGeoJSON::new(CoordType::Separated).into()); + ctx.register_udf(AsGeoJSON::new().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsGeoJSON(ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[0,0],[10,0],[10,10],[0,10],[0,0]]]}'));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let result = str_arr.value(0); + assert!(result.contains("Polygon")); + } +} diff --git a/rust/geodatafusion/src/udf/native/io/mod.rs b/rust/geodatafusion/src/udf/native/io/mod.rs index 376bfb6..b0adc8b 100644 --- a/rust/geodatafusion/src/udf/native/io/mod.rs +++ b/rust/geodatafusion/src/udf/native/io/mod.rs @@ -1,8 +1,12 @@ //! Geometry Input and Output +mod ewkt; +mod geojson; mod wkb; mod wkt; +pub use ewkt::AsEWKT; +pub use geojson::{AsGeoJSON, GeomFromGeoJSON}; pub use wkb::{AsBinary, GeomFromWKB}; pub use wkt::{AsText, GeomFromText}; @@ -11,4 +15,7 @@ pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(GeomFromWKB::default().into()); session_context.register_udf(AsText.into()); session_context.register_udf(GeomFromText::default().into()); + session_context.register_udf(AsGeoJSON.into()); + session_context.register_udf(GeomFromGeoJSON::default().into()); + session_context.register_udf(AsEWKT.into()); }