diff --git a/Cargo.lock b/Cargo.lock index ede8317..3788d6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,7 +659,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.9.2", "regex", "sqlparser", "tempfile", @@ -774,7 +774,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "rand", + "rand 0.9.2", "tokio", "url", ] @@ -900,7 +900,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.9.2", "tempfile", "url", ] @@ -961,7 +961,7 @@ dependencies = [ "itertools 0.14.0", "log", "num-traits", - "rand", + "rand 0.9.2", "regex", "unicode-segmentation", "uuid", @@ -1640,7 +1640,7 @@ dependencies = [ [[package]] name = "geodatafusion" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-arith", @@ -1653,7 +1653,10 @@ dependencies = [ "geoarrow-expr-geo", "geoarrow-schema", "geohash", + "geojson", "object_store", + "serde_json", + "spatialbench", "thiserror 1.0.69", "tokio", "wkt 0.14.0", @@ -1661,7 +1664,7 @@ dependencies = [ [[package]] name = "geodatafusion-flatgeobuf" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-array", @@ -1685,7 +1688,7 @@ dependencies = [ [[package]] name = "geodatafusion-geojson" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "arrow-array", "arrow-schema", @@ -1707,7 +1710,7 @@ dependencies = [ [[package]] name = "geodatafusion-geoparquet" -version = "0.3.0" +version = "0.4.0-hotdata.1" dependencies = [ "approx", "arrow-schema", @@ -2548,7 +2551,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml", - "rand", + "rand 0.9.2", "reqwest", "ring", "serde", @@ -2785,7 +2788,7 @@ dependencies = [ "bytes", "getrandom 0.3.3", "lru-slab", - "rand", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -2826,14 +2829,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -2843,7 +2867,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -3335,6 +3368,18 @@ dependencies = [ "smallvec", ] +[[package]] +name = "spatialbench" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07f3f4b67ccf571f183d3695aa6b9d6f996864c31782a480e97a23ed0f2f6f18" +dependencies = [ + "geo", + "once_cell", + "rand 0.8.5", + "serde", +] + [[package]] name = "sqlparser" version = "0.59.0" diff --git a/rust/geodatafusion/Cargo.toml b/rust/geodatafusion/Cargo.toml index 5fb3ddc..2e59cd1 100644 --- a/rust/geodatafusion/Cargo.toml +++ b/rust/geodatafusion/Cargo.toml @@ -21,12 +21,15 @@ geoarrow-array = { workspace = true } geoarrow-expr-geo = { workspace = true } geoarrow-schema = { workspace = true } geohash = { workspace = true } +geojson = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } wkt = { workspace = true } [dev-dependencies] approx = { workspace = true } datafusion = { workspace = true, features = ["sql"] } +spatialbench = { workspace = true } geo-traits = { workspace = true } geoarrow-array = { workspace = true, features = ["test-data"] } object_store = { workspace = true, features = ["http"] } diff --git a/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs b/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs new file mode 100644 index 0000000..822639d --- /dev/null +++ b/rust/geodatafusion/src/udf/native/accessors/geometry_accessors.rs @@ -0,0 +1,963 @@ +//! Additional geometry accessor functions: ST_IsEmpty, ST_IsClosed, ST_IsSimple, +//! ST_IsRing, ST_IsCollection, ST_Dimension, ST_NumGeometries, ST_HasZ, ST_HasM, +//! ST_NRings. + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::{BooleanBuilder, Int32Builder, UInt32Builder}; +use arrow_array::{ArrayRef, BooleanArray, Int32Array, UInt32Array}; +use arrow_schema::DataType; +use datafusion::error::Result; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, +}; +use geo_traits::*; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::Dimension; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// ========================================================================= +// ST_IsEmpty +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsEmpty; + +impl Default for IsEmpty { + fn default() -> Self { + Self + } +} + +static IS_EMPTY_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsEmpty { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isempty" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_empty_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_EMPTY_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is an empty geometry (has no points).", + "ST_IsEmpty(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_empty_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_empty_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_empty_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_empty_impl) +} + +fn _is_empty_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let empty = match g.as_type() { + GeometryType::Point(p) => p.coord().is_none(), + GeometryType::LineString(ls) => ls.num_coords() == 0, + GeometryType::Polygon(p) => { + p.exterior().map_or(true, |ext| ext.num_coords() == 0) + } + GeometryType::MultiPoint(mp) => mp.num_points() == 0, + GeometryType::MultiLineString(mls) => mls.num_line_strings() == 0, + GeometryType::MultiPolygon(mp) => mp.num_polygons() == 0, + GeometryType::GeometryCollection(gc) => gc.num_geometries() == 0, + _ => false, + }; + builder.append_value(empty); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_IsClosed +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsClosed; + +impl Default for IsClosed { + fn default() -> Self { + Self + } +} + +static IS_CLOSED_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsClosed { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isclosed" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_closed_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_CLOSED_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry's start and end points are coincident.", + "ST_IsClosed(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_closed_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_closed_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_closed_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_closed_impl) +} + +fn _is_closed_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let closed = is_geo_closed(&geo_geom); + builder.append_value(closed); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn is_geo_closed(geom: &geo::Geometry) -> bool { + match geom { + geo::Geometry::Point(_) => true, + geo::Geometry::LineString(ls) => is_linestring_closed(ls), + geo::Geometry::Polygon(_) => true, + geo::Geometry::MultiLineString(mls) => mls.0.iter().all(is_linestring_closed), + _ => true, + } +} + +fn is_linestring_closed(ls: &geo::LineString) -> bool { + if ls.0.len() < 2 { + return false; + } + let first = ls.0.first().unwrap(); + let last = ls.0.last().unwrap(); + first.x == last.x && first.y == last.y +} + +// ========================================================================= +// ST_IsSimple +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsSimple; + +impl Default for IsSimple { + fn default() -> Self { + Self + } +} + +static IS_SIMPLE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsSimple { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_issimple" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_simple_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_SIMPLE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has no anomalous geometric points.", + "ST_IsSimple(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_simple_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_simple_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_simple_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_simple_impl) +} + +fn _is_simple_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let simple = is_geometry_simple(&geo_geom); + builder.append_value(simple); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn is_geometry_simple(geom: &geo::Geometry) -> bool { + match geom { + geo::Geometry::Point(_) => true, + geo::Geometry::MultiPoint(mp) => { + // Simple if no two points are equal + for i in 0..mp.0.len() { + for j in (i + 1)..mp.0.len() { + if mp.0[i] == mp.0[j] { + return false; + } + } + } + true + } + geo::Geometry::LineString(ls) => is_linestring_simple(ls), + geo::Geometry::MultiLineString(mls) => mls.0.iter().all(is_linestring_simple), + geo::Geometry::Polygon(_) | geo::Geometry::MultiPolygon(_) => true, + geo::Geometry::GeometryCollection(gc) => gc.0.iter().all(is_geometry_simple), + _ => true, + } +} + +fn is_linestring_simple(ls: &geo::LineString) -> bool { + use geo::line_intersection::{line_intersection, LineIntersection}; + + if ls.0.len() < 2 { + return true; + } + let lines: Vec = ls.lines().collect(); + for i in 0..lines.len() { + // Check against non-adjacent segments + for j in (i + 2)..lines.len() { + // Skip adjacent segments (i and i+1 share an endpoint) + // Also skip the pair of first and last segment if the linestring is closed + // (they share the start/end point by definition) + if i == 0 && j == lines.len() - 1 && is_linestring_closed(ls) { + continue; + } + if let Some(intersection) = line_intersection(lines[i], lines[j]) { + match intersection { + LineIntersection::SinglePoint { intersection, .. } => { + // Shared endpoint between non-adjacent segments is OK + // only if it's at the ends of both segments + let is_endpoint_i = + intersection == lines[i].start || intersection == lines[i].end; + let is_endpoint_j = + intersection == lines[j].start || intersection == lines[j].end; + if !(is_endpoint_i && is_endpoint_j) { + return false; + } + } + LineIntersection::Collinear { .. } => return false, + } + } + } + } + true +} + +// ========================================================================= +// ST_IsRing +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsRing; + +impl Default for IsRing { + fn default() -> Self { + Self + } +} + +static IS_RING_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsRing { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_isring" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_ring_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_RING_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is a closed and simple linestring (a ring).", + "ST_IsRing(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_ring_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_ring_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_ring_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_ring_impl) +} + +fn _is_ring_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + use geoarrow_expr_geo::util::to_geo::geometry_to_geo; + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let ring = match &geo_geom { + geo::Geometry::LineString(ls) => { + ls.0.len() >= 4 && is_linestring_closed(ls) && is_linestring_simple(ls) + } + _ => false, + }; + builder.append_value(ring); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_IsCollection +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct IsCollection; + +impl Default for IsCollection { + fn default() -> Self { + Self + } +} + +static IS_COLLECTION_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for IsCollection { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_iscollection" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(is_collection_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(IS_COLLECTION_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry is a collection type.", + "ST_IsCollection(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn is_collection_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = is_collection_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn is_collection_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _is_collection_impl) +} + +fn _is_collection_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = BooleanBuilder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let is_coll = matches!( + g.as_type(), + GeometryType::MultiPoint(_) + | GeometryType::MultiLineString(_) + | GeometryType::MultiPolygon(_) + | GeometryType::GeometryCollection(_) + ); + builder.append_value(is_coll); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_Dimension +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct StDimension; + +impl Default for StDimension { + fn default() -> Self { + Self + } +} + +static DIMENSION_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for StDimension { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_dimension" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(dimension_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(DIMENSION_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the topological dimension of the geometry.", + "ST_Dimension(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn dimension_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = dimension_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn dimension_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _dimension_impl) +} + +fn _dimension_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = Int32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let dim = match g.as_type() { + GeometryType::Point(_) | GeometryType::MultiPoint(_) => 0, + GeometryType::LineString(_) + | GeometryType::MultiLineString(_) + | GeometryType::Line(_) => 1, + GeometryType::Polygon(_) + | GeometryType::MultiPolygon(_) + | GeometryType::Triangle(_) + | GeometryType::Rect(_) => 2, + GeometryType::GeometryCollection(gc) => { + let mut max_dim = -1i32; + for g in gc.geometries() { + let d = match g.as_type() { + GeometryType::Point(_) | GeometryType::MultiPoint(_) => 0, + GeometryType::LineString(_) + | GeometryType::MultiLineString(_) => 1, + GeometryType::Polygon(_) + | GeometryType::MultiPolygon(_) => 2, + _ => 0, + }; + if d > max_dim { + max_dim = d; + } + } + max_dim + } + }; + builder.append_value(dim); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_NumGeometries +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct NumGeometries; + +impl Default for NumGeometries { + fn default() -> Self { + Self + } +} + +static NUM_GEOMETRIES_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for NumGeometries { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_numgeometries" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::UInt32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(num_geometries_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(NUM_GEOMETRIES_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the number of elements in a geometry collection.", + "ST_NumGeometries(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn num_geometries_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = num_geometries_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn num_geometries_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _num_geometries_impl) +} + +fn _num_geometries_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = UInt32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let count = match g.as_type() { + GeometryType::Point(_) + | GeometryType::LineString(_) + | GeometryType::Polygon(_) => 1u32, + GeometryType::MultiPoint(mp) => mp.num_points() as u32, + GeometryType::MultiLineString(mls) => mls.num_line_strings() as u32, + GeometryType::MultiPolygon(mp) => mp.num_polygons() as u32, + GeometryType::GeometryCollection(gc) => gc.num_geometries() as u32, + _ => 1, + }; + builder.append_value(count); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// ========================================================================= +// ST_NRings +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct NRings; + +impl Default for NRings { + fn default() -> Self { + Self + } +} + +static NRINGS_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for NRings { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_nrings" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::UInt32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(nrings_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(NRINGS_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the total number of rings in a geometry.", + "ST_NRings(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +fn nrings_impl(args: ScalarFunctionArgs) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = nrings_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) +} + +fn nrings_array(array: &dyn GeoArrowArray) -> GeoArrowResult { + downcast_geoarrow_array!(array, _nrings_impl) +} + +fn _nrings_impl<'a>(array: &'a impl GeoArrowArrayAccessor<'a>) -> GeoArrowResult { + let mut builder = UInt32Builder::with_capacity(array.len()); + for item in array.iter() { + if let Some(geom) = item { + let g = geom?; + let count = count_rings(&g); + builder.append_value(count); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +fn count_rings(geom: &impl GeometryTrait) -> u32 { + match geom.as_type() { + GeometryType::Polygon(p) => { + let exterior = if p.exterior().is_some() { 1u32 } else { 0 }; + exterior + p.num_interiors() as u32 + } + GeometryType::MultiPolygon(mp) => { + (0..mp.num_polygons()) + .filter_map(|i| mp.polygon(i)) + .map(|p| count_rings_polygon(&p)) + .sum() + } + GeometryType::GeometryCollection(gc) => gc.geometries().map(|g| count_rings(&g)).sum(), + _ => 0, + } +} + +fn count_rings_polygon(p: &impl PolygonTrait) -> u32 { + let exterior = if p.exterior().is_some() { 1u32 } else { 0 }; + exterior + p.num_interiors() as u32 +} + +// ========================================================================= +// ST_HasZ +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct HasZ; + +impl Default for HasZ { + fn default() -> Self { + Self + } +} + +static HASZ_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for HasZ { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_hasz" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0]) + .map_err(|e| datafusion::error::DataFusionError::Internal(e.to_string()))?; + let dim = geo_array.data_type().dimension(); + let has_z = matches!(dim, Some(Dimension::XYZ) | Some(Dimension::XYZM)); + let result = BooleanArray::from(vec![Some(has_z); arrays[0].len()]); + Ok(ColumnarValue::Array(Arc::new(result))) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(HASZ_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has a Z coordinate dimension.", + "ST_HasZ(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +// ========================================================================= +// ST_HasM +// ========================================================================= +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct HasM; + +impl Default for HasM { + fn default() -> Self { + Self + } +} + +static HASM_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for HasM { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_hasm" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0]) + .map_err(|e| datafusion::error::DataFusionError::Internal(e.to_string()))?; + let dim = geo_array.data_type().dimension(); + let has_m = matches!(dim, Some(Dimension::XYM) | Some(Dimension::XYZM)); + let result = BooleanArray::from(vec![Some(has_m); arrays[0].len()]); + Ok(ColumnarValue::Array(Arc::new(result))) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(HASM_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns true if the geometry has an M (measure) coordinate dimension.", + "ST_HasM(geometry)", + ) + .with_argument("geom", "geometry") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray; + use arrow_array::types::{Int32Type, UInt32Type}; + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_is_empty_point() { + let ctx = SessionContext::new(); + ctx.register_udf(IsEmpty.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsEmpty(ST_GeomFromText('POINT(0 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(!batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_is_closed_ring() { + let ctx = SessionContext::new(); + ctx.register_udf(IsClosed.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsClosed(ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0, 0 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_is_closed_open() { + let ctx = SessionContext::new(); + ctx.register_udf(IsClosed.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_IsClosed(ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert!(!batch.column(0).as_boolean().value(0)); + } + + #[tokio::test] + async fn test_dimension() { + let ctx = SessionContext::new(); + ctx.register_udf(StDimension.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_Dimension(ST_GeomFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert_eq!(batch.column(0).as_primitive::().value(0), 2); + } + + #[tokio::test] + async fn test_num_geometries_multi() { + let ctx = SessionContext::new(); + ctx.register_udf(NumGeometries.into()); + ctx.register_udf(GeomFromText::default().into()); + + let df = ctx + .sql("SELECT ST_NumGeometries(ST_GeomFromText('MULTIPOINT(0 0, 1 1, 2 2)'));") + .await + .unwrap(); + let batch = df.collect().await.unwrap().into_iter().next().unwrap(); + assert_eq!(batch.column(0).as_primitive::().value(0), 3); + } +} diff --git a/rust/geodatafusion/src/udf/native/accessors/mod.rs b/rust/geodatafusion/src/udf/native/accessors/mod.rs index 5a716ca..6532a63 100644 --- a/rust/geodatafusion/src/udf/native/accessors/mod.rs +++ b/rust/geodatafusion/src/udf/native/accessors/mod.rs @@ -1,4 +1,5 @@ mod coord_dim; +mod geometry_accessors; mod geometry_type; mod line_string; mod npoints; @@ -6,6 +7,10 @@ mod num_interior_rings; mod point; pub use coord_dim::{CoordDim, NDims}; +pub use geometry_accessors::{ + HasM, HasZ, IsClosed, IsCollection, IsEmpty, IsRing, IsSimple, NRings, NumGeometries, + StDimension, +}; pub use geometry_type::{GeometryType, ST_GeometryType}; pub use line_string::{EndPoint, StartPoint}; pub use npoints::NPoints; @@ -14,13 +19,23 @@ pub use point::{M, X, Y, Z}; pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(CoordDim.into()); + session_context.register_udf(StDimension.into()); session_context.register_udf(NDims.into()); session_context.register_udf(GeometryType.into()); session_context.register_udf(ST_GeometryType.into()); session_context.register_udf(EndPoint::default().into()); - session_context.register_udf(StartPoint::default().into()); + session_context.register_udf(HasM.into()); + session_context.register_udf(HasZ.into()); + session_context.register_udf(IsClosed.into()); + session_context.register_udf(IsCollection.into()); + session_context.register_udf(IsEmpty.into()); + session_context.register_udf(IsRing.into()); + session_context.register_udf(IsSimple.into()); session_context.register_udf(NPoints.into()); + session_context.register_udf(NRings.into()); + session_context.register_udf(NumGeometries.into()); session_context.register_udf(NumInteriorRings.into()); + session_context.register_udf(StartPoint::default().into()); session_context.register_udf(M::default().into()); session_context.register_udf(X::default().into()); session_context.register_udf(Y::default().into()); diff --git a/rust/geodatafusion/src/udf/native/constructors/make.rs b/rust/geodatafusion/src/udf/native/constructors/make.rs new file mode 100644 index 0000000..54a5208 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/constructors/make.rs @@ -0,0 +1,584 @@ +//! Line, Polygon, and Envelope constructors + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::cast::AsArray; +use arrow_array::types::Float64Type; +use arrow_array::Array; +use arrow_schema::{DataType, FieldRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::builder::GeometryBuilder; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::{CoordType, GeometryType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_MakeLine +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakeLine { + signature: Signature, + coord_type: CoordType, +} + +impl MakeLine { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::any(2, Volatility::Immutable), + coord_type, + } + } +} + +impl Default for MakeLine { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_LINE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakeLine { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makeline" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let metadata = + Arc::new(Metadata::try_from(args.arg_fields[0].as_ref()).unwrap_or_default()); + let output_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(Arc::new(output_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_line_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_LINE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a LineString containing the points of two input geometries. Other geometries are decomposed into their component points.", + "ST_MakeLine(geom1, geom2)", + ) + .with_argument("geom1", "geometry") + .with_argument("geom2", "geometry") + .build() + })) + } +} + +fn make_line_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let left_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let right_array = from_arrow_array(&arrays[1], &args.arg_fields[1])?; + let result = make_line_arrays(&left_array, &right_array, coord_type)?; + Ok(ColumnarValue::Array(result.into_array_ref())) +} + +fn make_line_arrays( + left: &dyn GeoArrowArray, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(left, _make_line_left, right, coord_type) +} + +fn _make_line_left<'a>( + left: &'a impl GeoArrowArrayAccessor<'a>, + right: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(right, _make_line_both, left, coord_type) +} + +fn _make_line_both<'a, 'b>( + right: &'b impl GeoArrowArrayAccessor<'b>, + left: &'a impl GeoArrowArrayAccessor<'a>, + coord_type: CoordType, +) -> GeoArrowResult> { + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for (l, r) in left.iter().zip(right.iter()) { + match (l, r) { + (Some(left_geom), Some(right_geom)) => { + let left_geo = geometry_to_geo(&left_geom?)?; + let right_geo = geometry_to_geo(&right_geom?)?; + + let mut coords = collect_points(&left_geo); + coords.extend(collect_points(&right_geo)); + + if coords.len() >= 2 { + let line = geo::LineString::new(coords); + builder.push_geometry(Some(&geo::Geometry::LineString(line)))?; + } else { + builder.push_null(); + } + } + _ => builder.push_null(), + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Extract all coordinate points from a geometry. +fn collect_points(geom: &geo::Geometry) -> Vec { + use geo::CoordsIter; + geom.coords_iter().collect() +} + +// --------------------------------------------------------------------------- +// ST_MakePolygon +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakePolygon { + coord_type: CoordType, +} + +impl MakePolygon { + pub fn new(coord_type: CoordType) -> Self { + Self { coord_type } + } +} + +impl Default for MakePolygon { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_POLYGON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakePolygon { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makepolygon" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let metadata = + Arc::new(Metadata::try_from(args.arg_fields[0].as_ref()).unwrap_or_default()); + let output_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(Arc::new(output_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_polygon_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_POLYGON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a Polygon from a closed LineString shell.", + "ST_MakePolygon(linestring)", + ) + .with_argument("shell", "A closed LineString geometry") + .build() + })) + } +} + +fn make_polygon_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + let geo_array = from_arrow_array(&arrays[0], &args.arg_fields[0])?; + let result = make_polygon_array(&geo_array, coord_type)?; + Ok(ColumnarValue::Array(result.into_array_ref())) +} + +fn make_polygon_array( + array: &dyn GeoArrowArray, + coord_type: CoordType, +) -> GeoArrowResult> { + downcast_geoarrow_array!(array, _make_polygon_impl, coord_type) +} + +fn _make_polygon_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, + coord_type: CoordType, +) -> GeoArrowResult> { + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + match geo_geom { + geo::Geometry::LineString(mut ls) => { + // Auto-close if not already closed (matches PostGIS behavior) + if ls.0.len() >= 2 && ls.0.first() != ls.0.last() { + ls.0.push(ls.0[0]); + } + let polygon = geo::Polygon::new(ls, vec![]); + builder.push_geometry(Some(&geo::Geometry::Polygon(polygon)))?; + } + _ => { + // For non-linestring inputs, push null + builder.push_null(); + } + } + } else { + builder.push_null(); + } + } + + Ok(Arc::new(builder.finish())) +} + +// --------------------------------------------------------------------------- +// ST_MakeEnvelope +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MakeEnvelope { + signature: Signature, + coord_type: CoordType, +} + +impl MakeEnvelope { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![ + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Float64, + ]), + TypeSignature::Exact(vec![ + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Float64, + DataType::Int64, + ]), + ], + Volatility::Immutable, + ), + coord_type, + } + } +} + +impl Default for MakeEnvelope { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static MAKE_ENVELOPE_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for MakeEnvelope { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_makeenvelope" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let mut geom_type = + GeometryType::new(Default::default()).with_coord_type(self.coord_type); + + // If a 5th argument (SRID) is provided as scalar, use it + if let Some(srid) = args.scalar_arguments.get(4) { + if let Some(datafusion::scalar::ScalarValue::Int64(srid_val)) = srid { + let crs = geoarrow_schema::Crs::from_authority_code(format!( + "EPSG:{}", + match srid_val { + Some(v) => v, + None => return Ok(Arc::new(geom_type.to_field("", true))), + } + )); + geom_type = geom_type + .with_metadata(Arc::new(Metadata::new(crs, Default::default()))); + } + } + + Ok(Arc::new(geom_type.to_field("", true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(make_envelope_impl(args, self.coord_type)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(MAKE_ENVELOPE_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Creates a rectangular Polygon from minimum and maximum coordinate values. The polygon is formed in the coordinate plane with corners at (xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin).", + "ST_MakeEnvelope(xmin, ymin, xmax, ymax) or ST_MakeEnvelope(xmin, ymin, xmax, ymax, srid)", + ) + .with_argument("xmin", "minimum x value") + .with_argument("ymin", "minimum y value") + .with_argument("xmax", "maximum x value") + .with_argument("ymax", "maximum y value") + .with_argument("srid", "optional integer SRID value") + .build() + })) + } +} + +fn make_envelope_impl( + args: ScalarFunctionArgs, + coord_type: CoordType, +) -> GeoDataFusionResult { + let arrays = ColumnarValue::values_to_arrays(&args.args[..4])?; + let xmin = arrays[0].as_primitive::(); + let ymin = arrays[1].as_primitive::(); + let xmax = arrays[2].as_primitive::(); + let ymax = arrays[3].as_primitive::(); + + let geom_type = GeometryType::new(Default::default()).with_coord_type(coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + for i in 0..xmin.len() { + if xmin.is_null(i) || ymin.is_null(i) || xmax.is_null(i) || ymax.is_null(i) { + builder.push_null(); + } else { + let x1 = xmin.value(i); + let y1 = ymin.value(i); + let x2 = xmax.value(i); + let y2 = ymax.value(i); + + let polygon = geo::Polygon::new( + geo::LineString::from(vec![ + (x1, y1), + (x1, y2), + (x2, y2), + (x2, y1), + (x1, y1), + ]), + vec![], + ); + builder.push_geometry(Some(&geo::Geometry::Polygon(polygon)))?; + } + } + + Ok(ColumnarValue::Array(builder.finish().to_array_ref())) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Float64Array, RecordBatch}; + use arrow_schema::{Field, Schema}; + use datafusion::prelude::SessionContext; + use geoarrow_schema::Crs; + + use super::*; + use crate::udf::native::io::{AsText, GeomFromText}; + + #[tokio::test] + async fn test_make_line() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeLine::default().into()); + ctx.register_udf(GeomFromText::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakeLine(ST_GeomFromText('POINT(0 0)'), ST_GeomFromText('POINT(1 1)')));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("LINESTRING"), + "Expected LINESTRING in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_polygon() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakePolygon::default().into()); + ctx.register_udf(GeomFromText::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakePolygon(ST_GeomFromText('LINESTRING(0 0, 10 0, 10 10, 0 10, 0 0)')));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_envelope() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + ctx.register_udf(AsText.into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsText(ST_MakeEnvelope(0.0, 0.0, 1.0, 1.0));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } + + #[tokio::test] + async fn test_make_envelope_with_srid() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_MakeEnvelope(0.0, 0.0, 1.0, 1.0, 4326);"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let output_schema = output_batch.schema(); + let output_field = output_schema.field(0); + + let geom_type = output_field.try_extension_type::().unwrap(); + assert_eq!( + geom_type.metadata().crs(), + &Crs::from_authority_code("EPSG:4326".to_string()) + ); + } + + #[tokio::test] + async fn test_make_envelope_from_table() { + let ctx = SessionContext::new(); + + ctx.register_udf(MakeEnvelope::default().into()); + ctx.register_udf(AsText.into()); + + let xmin: ArrayRef = Arc::new(Float64Array::from(vec![0.0])); + let ymin: ArrayRef = Arc::new(Float64Array::from(vec![0.0])); + let xmax: ArrayRef = Arc::new(Float64Array::from(vec![10.0])); + let ymax: ArrayRef = Arc::new(Float64Array::from(vec![10.0])); + + let schema = Schema::new([ + Arc::new(Field::new("xmin", DataType::Float64, true)), + Arc::new(Field::new("ymin", DataType::Float64, true)), + Arc::new(Field::new("xmax", DataType::Float64, true)), + Arc::new(Field::new("ymax", DataType::Float64, true)), + ]); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![xmin, ymin, xmax, ymax]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + let sql_df = ctx + .sql("SELECT ST_AsText(ST_MakeEnvelope(xmin, ymin, xmax, ymax)) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let wkt = str_arr.value(0); + assert!( + wkt.contains("POLYGON"), + "Expected POLYGON in output, got: {}", + wkt + ); + } +} diff --git a/rust/geodatafusion/src/udf/native/constructors/mod.rs b/rust/geodatafusion/src/udf/native/constructors/mod.rs index 18438bb..d153f22 100644 --- a/rust/geodatafusion/src/udf/native/constructors/mod.rs +++ b/rust/geodatafusion/src/udf/native/constructors/mod.rs @@ -1,5 +1,7 @@ +mod make; mod point; +pub use make::{MakeEnvelope, MakeLine, MakePolygon}; pub use point::{MakePoint, MakePointM, Point, PointM, PointZ, PointZM}; pub fn register(session_context: &datafusion::prelude::SessionContext) { @@ -9,4 +11,7 @@ pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(PointM::default().into()); session_context.register_udf(PointZ::default().into()); session_context.register_udf(PointZM::default().into()); + session_context.register_udf(MakeLine::default().into()); + session_context.register_udf(MakePolygon::default().into()); + session_context.register_udf(MakeEnvelope::default().into()); } diff --git a/rust/geodatafusion/src/udf/native/io/ewkt.rs b/rust/geodatafusion/src/udf/native/io/ewkt.rs new file mode 100644 index 0000000..735e498 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/io/ewkt.rs @@ -0,0 +1,232 @@ +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::StringBuilder; +use arrow_array::Array; +use arrow_schema::{DataType, Field}; +use datafusion::error::Result; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::cast::to_wkt; +use geoarrow_array::GeoArrowArray; +use geoarrow_schema::{CrsType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_AsEWKT +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct AsEWKT; + +impl AsEWKT { + pub fn new() -> Self { + Self {} + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + let geo_array = from_arrow_array(&array, field.as_ref())?; + + // Get WKT representation + let wkt_arr = to_wkt::(geo_array.as_ref())?; + + // Extract SRID from metadata if present + let metadata = Metadata::try_from(field.as_ref()).ok(); + let srid_prefix = metadata + .as_ref() + .and_then(|m| { + let crs = m.crs(); + if crs.crs_type() == Some(CrsType::AuthorityCode) { + if let Some(val) = crs.crs_value() { + if let Some(code_str) = val.as_str() { + if let Some(srid) = code_str.strip_prefix("EPSG:") { + return Some(format!("SRID={};", srid)); + } + } + } + } + None + }) + .unwrap_or_default(); + + // Build output with SRID prefix + let str_arr = wkt_arr.into_array_ref(); + let wkt_string_arr = str_arr + .as_any() + .downcast_ref::() + .unwrap(); + + let mut builder = + StringBuilder::with_capacity(wkt_string_arr.len(), wkt_string_arr.len() * 64); + for i in 0..wkt_string_arr.len() { + if wkt_string_arr.is_null(i) { + builder.append_null(); + } else { + let wkt_val = wkt_string_arr.value(i); + let ewkt = format!("{}{}", srid_prefix, wkt_val); + builder.append_value(&ewkt); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } +} + +impl Default for AsEWKT { + fn default() -> Self { + Self::new() + } +} + +static AS_EWKT_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsEWKT { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_asewkt" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + Ok(Field::new(input_field.name(), DataType::Utf8, input_field.is_nullable()).into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_EWKT_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the Extended Well-Known Text (EWKT) representation of the geometry with SRID metadata.", + "ST_AsEWKT(geometry)", + ) + .with_argument("g1", "geometry") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::Schema; + use datafusion::prelude::SessionContext; + use geoarrow_array::test::point; + use geoarrow_schema::{CoordType, Crs, Dimension, Metadata}; + + use super::*; + + #[tokio::test] + async fn test_as_ewkt_with_srid() { + let ctx = SessionContext::new(); + + let crs = Crs::from_authority_code("EPSG:4326".to_string()); + let metadata = Arc::new(Metadata::new(crs, Default::default())); + + let geo_arr = point::array(CoordType::Separated, Dimension::XY).with_metadata(metadata); + + let arr = geo_arr.to_array_ref(); + let field = geo_arr.data_type().to_field("geometry", true); + let schema = Schema::new([Arc::new(field)]); + let batch = + arrow_array::RecordBatch::try_new(Arc::new(schema), vec![arr]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + ctx.register_udf(AsEWKT::new().into()); + + let sql_df = ctx + .sql("SELECT ST_AsEWKT(geometry) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(str_arr.len() > 0); + let first = str_arr.value(0); + assert!( + first.starts_with("SRID=4326;"), + "Expected EWKT to start with 'SRID=4326;', got: {}", + first + ); + assert!( + first.contains("POINT"), + "Expected EWKT to contain 'POINT', got: {}", + first + ); + } + + #[tokio::test] + async fn test_as_ewkt_without_srid() { + let ctx = SessionContext::new(); + + let geo_arr = point::array(CoordType::Separated, Dimension::XY); + + let arr = geo_arr.to_array_ref(); + let field = geo_arr.data_type().to_field("geometry", true); + let schema = Schema::new([Arc::new(field)]); + let batch = + arrow_array::RecordBatch::try_new(Arc::new(schema), vec![arr]).unwrap(); + + ctx.register_batch("t", batch).unwrap(); + + ctx.register_udf(AsEWKT::new().into()); + + let sql_df = ctx + .sql("SELECT ST_AsEWKT(geometry) FROM t;") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(str_arr.len() > 0); + let first = str_arr.value(0); + assert!( + !first.starts_with("SRID="), + "Expected no SRID prefix when no CRS is set, got: {}", + first + ); + assert!( + first.contains("POINT"), + "Expected EWKT to contain 'POINT', got: {}", + first + ); + } +} diff --git a/rust/geodatafusion/src/udf/native/io/geojson.rs b/rust/geodatafusion/src/udf/native/io/geojson.rs new file mode 100644 index 0000000..ba749f0 --- /dev/null +++ b/rust/geodatafusion/src/udf/native/io/geojson.rs @@ -0,0 +1,379 @@ +use std::any::Any; +use std::str::FromStr; +use std::sync::{Arc, OnceLock}; + +use arrow_array::builder::StringBuilder; +use arrow_array::Array; +use arrow_schema::{DataType, Field}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, +}; +use geoarrow_array::array::from_arrow_array; +use geoarrow_array::builder::GeometryBuilder; +use geoarrow_array::{GeoArrowArray, GeoArrowArrayAccessor, downcast_geoarrow_array}; +use geoarrow_expr_geo::util::to_geo::geometry_to_geo; +use geoarrow_schema::error::GeoArrowResult; +use geoarrow_schema::{CoordType, GeometryType, Metadata}; + +use crate::data_types::any_single_geometry_type_input; +use crate::error::GeoDataFusionResult; + +// --------------------------------------------------------------------------- +// ST_AsGeoJSON +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct AsGeoJSON; + +impl AsGeoJSON { + pub fn new() -> Self { + Self {} + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + let geo_array = from_arrow_array(&array, field.as_ref())?; + let result = as_geojson_array(&geo_array)?; + Ok(ColumnarValue::Array(Arc::new(result))) + } +} + +impl Default for AsGeoJSON { + fn default() -> Self { + Self::new() + } +} + +static AS_GEOJSON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for AsGeoJSON { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_asgeojson" + } + + fn signature(&self) -> &Signature { + any_single_geometry_type_input() + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + Ok(Field::new(input_field.name(), DataType::Utf8, input_field.is_nullable()).into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(AS_GEOJSON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Returns the geometry as a GeoJSON element.", + "ST_AsGeoJSON(geometry)", + ) + .with_argument("g1", "geometry") + .build() + })) + } +} + +fn as_geojson_array( + array: &dyn GeoArrowArray, +) -> GeoArrowResult { + downcast_geoarrow_array!(array, _as_geojson_impl) +} + +fn _as_geojson_impl<'a>( + array: &'a impl GeoArrowArrayAccessor<'a>, +) -> GeoArrowResult { + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 64); + for item in array.iter() { + if let Some(geom) = item { + let geo_geom = geometry_to_geo(&geom?)?; + let geojson_value = geojson::Value::from(&geo_geom); + let geojson_geom = geojson::Geometry::new(geojson_value); + let json_string = serde_json::to_string(&geojson_geom) + .map_err(|e| geoarrow_schema::error::GeoArrowError::InvalidGeoArrow(e.to_string()))?; + builder.append_value(&json_string); + } else { + builder.append_null(); + } + } + Ok(builder.finish()) +} + +// --------------------------------------------------------------------------- +// ST_GeomFromGeoJSON +// --------------------------------------------------------------------------- + +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct GeomFromGeoJSON { + signature: Signature, + coord_type: CoordType, +} + +impl GeomFromGeoJSON { + pub fn new(coord_type: CoordType) -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View], + Volatility::Immutable, + ), + coord_type, + } + } + + fn invoke_impl(&self, args: ScalarFunctionArgs) -> GeoDataFusionResult { + let array = &ColumnarValue::values_to_arrays(&args.args)?[0]; + let field = &args.arg_fields[0]; + + let string_arr = array.as_ref(); + let len = string_arr.len(); + + let geom_type = GeometryType::new(Default::default()).with_coord_type(self.coord_type); + let mut builder = GeometryBuilder::new(geom_type); + + match field.data_type() { + DataType::Utf8 => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + DataType::LargeUtf8 => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + DataType::Utf8View => { + let str_arr = string_arr + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..len { + if str_arr.is_null(i) { + builder.push_null(); + } else { + let geojson_str = str_arr.value(i); + let geo_geom = parse_geojson_to_geo(geojson_str)?; + builder.push_geometry(Some(&geo_geom))?; + } + } + } + _ => { + return Err(DataFusionError::Internal(format!( + "Unexpected data type for ST_GeomFromGeoJSON: {:?}", + string_arr.data_type() + )) + .into()) + } + } + + Ok(ColumnarValue::Array(builder.finish().to_array_ref())) + } +} + +fn parse_geojson_to_geo(geojson_str: &str) -> GeoDataFusionResult { + use geojson::GeoJson; + + let geojson = GeoJson::from_str(geojson_str).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse GeoJSON: {}", e)) + })?; + + let geometry: geojson::Geometry = match geojson { + GeoJson::Geometry(g) => g, + GeoJson::Feature(f) => f.geometry.ok_or_else(|| { + DataFusionError::Execution("GeoJSON Feature has no geometry".to_string()) + })?, + GeoJson::FeatureCollection(_) => { + return Err(DataFusionError::Execution( + "GeoJSON FeatureCollection is not supported; pass a single geometry".to_string(), + ) + .into()); + } + }; + + let geo_geom: geo::Geometry = + geo::Geometry::try_from(geometry).map_err(|e| { + DataFusionError::Execution(format!("Failed to convert GeoJSON to geo type: {}", e)) + })?; + + Ok(geo_geom) +} + +impl Default for GeomFromGeoJSON { + fn default() -> Self { + Self::new(Default::default()) + } +} + +static GEOM_FROM_GEOJSON_DOC: OnceLock = OnceLock::new(); + +impl ScalarUDFImpl for GeomFromGeoJSON { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "st_geomfromgeojson" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Err(DataFusionError::Internal("return_type".to_string())) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { + let input_field = &args.arg_fields[0]; + let metadata = Arc::new(Metadata::try_from(input_field.as_ref()).unwrap_or_default()); + let geom_type = GeometryType::new(metadata).with_coord_type(self.coord_type); + Ok(geom_type + .to_field(input_field.name(), input_field.is_nullable()) + .into()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + Ok(self.invoke_impl(args)?) + } + + fn documentation(&self) -> Option<&Documentation> { + Some(GEOM_FROM_GEOJSON_DOC.get_or_init(|| { + Documentation::builder( + DOC_SECTION_OTHER, + "Takes a GeoJSON representation of a geometry and creates a geometry object.", + "ST_GeomFromGeoJSON(text)", + ) + .with_argument("geojson", "GeoJSON string") + .build() + })) + } +} + +#[cfg(test)] +mod test { + use datafusion::prelude::SessionContext; + + use super::*; + use crate::udf::native::io::GeomFromText; + + #[tokio::test] + async fn test_as_geojson() { + let ctx = SessionContext::new(); + + ctx.register_udf(AsGeoJSON::new().into()); + ctx.register_udf(GeomFromText::default().into()); + + // Use ST_GeomFromText to create geometry via SQL to avoid empty-point issues + // with the test helper arrays from geoarrow_array. + let sql_df = ctx + .sql("SELECT ST_AsGeoJSON(ST_GeomFromText('POINT(30 10)'));") + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + + // Verify output is Utf8 + let output_schema = output_batch.schema(); + let output_field = output_schema.field(0); + assert_eq!(output_field.data_type(), &DataType::Utf8); + + // Verify the GeoJSON contains expected structure + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(str_arr.len(), 1); + let first = str_arr.value(0); + assert!(first.contains("\"type\"")); + assert!(first.contains("\"coordinates\"")); + assert!(first.contains("30")); + assert!(first.contains("10")); + } + + #[tokio::test] + async fn test_geom_from_geojson() { + let ctx = SessionContext::new(); + + ctx.register_udf(GeomFromGeoJSON::new(CoordType::Separated).into()); + ctx.register_udf(AsGeoJSON::new().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsGeoJSON(ST_GeomFromGeoJSON('{"type":"Point","coordinates":[30,10]}'));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let result = str_arr.value(0); + assert!(result.contains("30")); + assert!(result.contains("10")); + } + + #[tokio::test] + async fn test_geom_from_geojson_polygon() { + let ctx = SessionContext::new(); + + ctx.register_udf(GeomFromGeoJSON::new(CoordType::Separated).into()); + ctx.register_udf(AsGeoJSON::new().into()); + + let sql_df = ctx + .sql(r#"SELECT ST_AsGeoJSON(ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[0,0],[10,0],[10,10],[0,10],[0,0]]]}'));"#) + .await + .unwrap(); + + let output_batches = sql_df.collect().await.unwrap(); + assert_eq!(output_batches.len(), 1); + let output_batch = &output_batches[0]; + let str_arr = output_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let result = str_arr.value(0); + assert!(result.contains("Polygon")); + } +} diff --git a/rust/geodatafusion/src/udf/native/io/mod.rs b/rust/geodatafusion/src/udf/native/io/mod.rs index 376bfb6..b0adc8b 100644 --- a/rust/geodatafusion/src/udf/native/io/mod.rs +++ b/rust/geodatafusion/src/udf/native/io/mod.rs @@ -1,8 +1,12 @@ //! Geometry Input and Output +mod ewkt; +mod geojson; mod wkb; mod wkt; +pub use ewkt::AsEWKT; +pub use geojson::{AsGeoJSON, GeomFromGeoJSON}; pub use wkb::{AsBinary, GeomFromWKB}; pub use wkt::{AsText, GeomFromText}; @@ -11,4 +15,7 @@ pub fn register(session_context: &datafusion::prelude::SessionContext) { session_context.register_udf(GeomFromWKB::default().into()); session_context.register_udf(AsText.into()); session_context.register_udf(GeomFromText::default().into()); + session_context.register_udf(AsGeoJSON.into()); + session_context.register_udf(GeomFromGeoJSON::default().into()); + session_context.register_udf(AsEWKT.into()); }