Skip to content

Commit 51a5640

Browse files
osipovartemeadgbear
authored andcommitted
Add ST_PointN and ST_Endpoint (#322)
* Add ST_MakePolygon, st_polygon * Add docs * Add ST_dimention * Add license * Add ST_Endpoint, ST_PointN * Reimplement ST_Point to work with snowflake linestring
1 parent 2146954 commit 51a5640

File tree

2 files changed

+364
-0
lines changed

2 files changed

+364
-0
lines changed
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::datafusion::functions::geospatial::data_types::{
19+
parse_to_native_array, BOX2D_TYPE, BOX3D_TYPE, GEOMETRY_TYPE, LINE_STRING_TYPE, POINT2D_TYPE,
20+
POINT3D_TYPE, POLYGON_2D_TYPE,
21+
};
22+
use crate::datafusion::functions::geospatial::error as geo_error;
23+
use crate::datafusion::functions::timestamp_from_parts::to_primitive_array;
24+
use arrow_array::types::Int64Type;
25+
use arrow_schema::DataType;
26+
use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_OTHER;
27+
use datafusion::logical_expr::{
28+
ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility,
29+
};
30+
use datafusion_common::{DataFusionError, Result};
31+
use geo_traits::LineStringTrait;
32+
use geoarrow::array::{AsNativeArray, CoordType, PointBuilder};
33+
use geoarrow::datatypes::Dimension;
34+
use geoarrow::error::GeoArrowError;
35+
use geoarrow::trait_::ArrayAccessor;
36+
use geoarrow::ArrayBase;
37+
use snafu::ResultExt;
38+
use std::any::Any;
39+
use std::sync::{Arc, OnceLock};
40+
41+
#[derive(Debug)]
42+
pub struct EndPoint {
43+
signature: Signature,
44+
}
45+
46+
impl EndPoint {
47+
pub fn new() -> Self {
48+
Self {
49+
signature: Signature::exact(vec![LINE_STRING_TYPE.into()], Volatility::Immutable),
50+
}
51+
}
52+
}
53+
54+
static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
55+
56+
impl ScalarUDFImpl for EndPoint {
57+
fn as_any(&self) -> &dyn Any {
58+
self
59+
}
60+
61+
fn name(&self) -> &'static str {
62+
"st_endpoint"
63+
}
64+
65+
fn signature(&self) -> &Signature {
66+
&self.signature
67+
}
68+
69+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
70+
Ok(POINT2D_TYPE.into())
71+
}
72+
73+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
74+
get_n_point(args, None)
75+
}
76+
77+
fn documentation(&self) -> Option<&Documentation> {
78+
Some(DOCUMENTATION.get_or_init(|| {
79+
Documentation::builder(
80+
DOC_SECTION_OTHER,
81+
"Returns the last point of a LINESTRING geometry as a POINT. Returns NULL if the input is not a LINESTRING",
82+
"ST_EndPoint(line_string)")
83+
.with_argument("g1", "geometry")
84+
.build()
85+
}))
86+
}
87+
}
88+
89+
#[derive(Debug)]
90+
pub struct StartPoint {
91+
signature: Signature,
92+
}
93+
94+
impl StartPoint {
95+
pub fn new() -> Self {
96+
Self {
97+
signature: Signature::exact(vec![LINE_STRING_TYPE.into()], Volatility::Immutable),
98+
}
99+
}
100+
}
101+
102+
impl ScalarUDFImpl for StartPoint {
103+
fn as_any(&self) -> &dyn Any {
104+
self
105+
}
106+
107+
fn name(&self) -> &'static str {
108+
"st_startpoint"
109+
}
110+
111+
fn signature(&self) -> &Signature {
112+
&self.signature
113+
}
114+
115+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
116+
Ok(POINT2D_TYPE.into())
117+
}
118+
119+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
120+
get_n_point(args, Some(1))
121+
}
122+
123+
fn documentation(&self) -> Option<&Documentation> {
124+
Some(DOCUMENTATION.get_or_init(|| {
125+
Documentation::builder(
126+
DOC_SECTION_OTHER,
127+
"Returns the first point of a LINESTRING geometry as a POINT.",
128+
"ST_StartPoint(line_string)",
129+
)
130+
.with_argument("g1", "geometry")
131+
.build()
132+
}))
133+
}
134+
}
135+
136+
#[derive(Debug)]
137+
pub struct PointN {
138+
signature: Signature,
139+
}
140+
141+
impl PointN {
142+
pub fn new() -> Self {
143+
Self {
144+
signature: Signature::one_of(
145+
vec![
146+
TypeSignature::Exact(vec![POINT2D_TYPE.into(), DataType::Int64]),
147+
TypeSignature::Exact(vec![POINT3D_TYPE.into(), DataType::Int64]),
148+
TypeSignature::Exact(vec![BOX2D_TYPE.into(), DataType::Int64]),
149+
TypeSignature::Exact(vec![BOX3D_TYPE.into(), DataType::Int64]),
150+
TypeSignature::Exact(vec![LINE_STRING_TYPE.into(), DataType::Int64]),
151+
TypeSignature::Exact(vec![POLYGON_2D_TYPE.into(), DataType::Int64]),
152+
TypeSignature::Exact(vec![POINT2D_TYPE.into(), DataType::Int64]),
153+
TypeSignature::Exact(vec![GEOMETRY_TYPE.into(), DataType::Int64]),
154+
],
155+
Volatility::Immutable,
156+
),
157+
}
158+
}
159+
}
160+
161+
impl ScalarUDFImpl for PointN {
162+
fn as_any(&self) -> &dyn Any {
163+
self
164+
}
165+
166+
fn name(&self) -> &'static str {
167+
"st_pointn"
168+
}
169+
170+
fn signature(&self) -> &Signature {
171+
&self.signature
172+
}
173+
174+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
175+
Ok(POINT2D_TYPE.into())
176+
}
177+
178+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
179+
if args.len() < 2 {
180+
return Err(DataFusionError::Execution(
181+
"Expected two arguments in ST_PointN".to_string(),
182+
));
183+
}
184+
let index = to_primitive_array::<Int64Type>(&args[1])?.value(0);
185+
get_n_point(args, Some(index))
186+
}
187+
188+
fn documentation(&self) -> Option<&Documentation> {
189+
Some(DOCUMENTATION.get_or_init(|| {
190+
Documentation::builder(
191+
DOC_SECTION_OTHER,
192+
"Returns a Point at a specified index in a LineString. Returns NULL if the input is not a LINESTRING",
193+
"ST_PointN(line_string)")
194+
.with_argument("g1", "geometry")
195+
.build()
196+
}))
197+
}
198+
}
199+
200+
fn get_n_point(args: &[ColumnarValue], n: Option<i64>) -> Result<ColumnarValue> {
201+
let array = ColumnarValue::values_to_arrays(args)?
202+
.into_iter()
203+
.next()
204+
.ok_or_else(|| DataFusionError::Execution("Expected at least one argument".to_string()))?;
205+
206+
let native_array = parse_to_native_array(&array)?;
207+
let native_array_ref = native_array.as_ref();
208+
let line_string_array = native_array_ref
209+
.as_line_string_opt()
210+
.ok_or(GeoArrowError::General(
211+
"Expected Geometry-typed array".to_string(),
212+
))
213+
.context(geo_error::GeoArrowSnafu)?;
214+
215+
let mut output_builder = PointBuilder::with_capacity_and_options(
216+
Dimension::XY,
217+
line_string_array.len(),
218+
CoordType::Separated,
219+
Arc::default(),
220+
);
221+
222+
for line in line_string_array.iter() {
223+
if let Some(line_string) = line {
224+
let pos = if let Some(n) = n {
225+
let index = if n < 0 {
226+
line_string.num_coords().try_into().unwrap_or(0) + n
227+
} else {
228+
n - 1
229+
};
230+
index
231+
.try_into()
232+
.map_err(|_| DataFusionError::Execution("Index out of bounds".to_string()))?
233+
} else {
234+
line_string.num_coords() - 1
235+
};
236+
output_builder.push_coord(line_string.coord(pos).as_ref());
237+
} else {
238+
output_builder.push_null();
239+
}
240+
}
241+
242+
Ok(output_builder.finish().into_array_ref().into())
243+
}
244+
245+
#[cfg(test)]
246+
mod tests {
247+
use super::*;
248+
use arrow_array::Array;
249+
use datafusion::logical_expr::ColumnarValue;
250+
use geo_types::line_string;
251+
use geoarrow::array::{LineStringBuilder, PointArray};
252+
use geoarrow::datatypes::Dimension;
253+
use geoarrow::trait_::ArrayAccessor;
254+
use geozero::ToWkt;
255+
256+
#[test]
257+
#[allow(clippy::unwrap_used)]
258+
fn test_start_point() {
259+
let data = vec![
260+
line_string![(x: 1., y: 1.), (x: 1., y: 0.), (x: 1., y: 1.)],
261+
line_string![(x: 2., y: 2.), (x: 3., y: 2.), (x: 3., y: 3.)],
262+
line_string![(x: 2., y: 2.), (x: 3., y: 2.)],
263+
];
264+
let array = LineStringBuilder::from_line_strings(
265+
&data,
266+
Dimension::XY,
267+
CoordType::Separated,
268+
Arc::default(),
269+
)
270+
.finish();
271+
272+
let data = array.to_array_ref();
273+
let args = vec![ColumnarValue::Array(data)];
274+
let start_point = StartPoint::new();
275+
let result = start_point.invoke_batch(&args, 3).unwrap();
276+
let result = result.to_array(3).unwrap();
277+
assert_eq!(result.data_type(), &POINT2D_TYPE.into());
278+
let result = PointArray::try_from((result.as_ref(), Dimension::XY)).unwrap();
279+
assert_eq!(result.get(0).unwrap().to_wkt().unwrap(), "POINT(1 1)");
280+
assert_eq!(result.get(1).unwrap().to_wkt().unwrap(), "POINT(2 2)");
281+
assert_eq!(result.get(2).unwrap().to_wkt().unwrap(), "POINT(2 2)");
282+
}
283+
284+
#[test]
285+
#[allow(clippy::unwrap_used)]
286+
fn test_end_point() {
287+
let data = vec![
288+
line_string![(x: 0., y: 0.), (x: 1., y: 0.), (x: 1., y: 1.)],
289+
line_string![(x: 2., y: 2.), (x: 3., y: 2.), (x: 3., y: 3.)],
290+
line_string![(x: 2., y: 2.), (x: 3., y: 2.)],
291+
];
292+
let array = LineStringBuilder::from_line_strings(
293+
&data,
294+
Dimension::XY,
295+
CoordType::Separated,
296+
Arc::default(),
297+
)
298+
.finish();
299+
300+
let data = array.to_array_ref();
301+
let args = vec![ColumnarValue::Array(data)];
302+
let end_point = EndPoint::new();
303+
let result = end_point.invoke_batch(&args, 3).unwrap();
304+
let result = result.to_array(3).unwrap();
305+
assert_eq!(result.data_type(), &POINT2D_TYPE.into());
306+
let result = PointArray::try_from((result.as_ref(), Dimension::XY)).unwrap();
307+
assert_eq!(result.get(0).unwrap().to_wkt().unwrap(), "POINT(1 1)");
308+
assert_eq!(result.get(1).unwrap().to_wkt().unwrap(), "POINT(3 3)");
309+
assert_eq!(result.get(2).unwrap().to_wkt().unwrap(), "POINT(3 2)");
310+
}
311+
312+
#[test]
313+
#[allow(clippy::unwrap_used)]
314+
fn test_point_n() {
315+
let data = vec![
316+
line_string![(x: 0., y: 0.), (x: 1., y: 0.), (x: 1., y: 1.), (x: 4.1, y: 4.1)],
317+
line_string![(x: 2., y: 2.), (x: 3., y: 2.), (x: 3., y: 3.)],
318+
line_string![(x: 2., y: 2.), (x: 4., y: 2.)],
319+
];
320+
let array = LineStringBuilder::from_line_strings(
321+
&data,
322+
Dimension::XY,
323+
CoordType::Separated,
324+
Arc::default(),
325+
)
326+
.finish();
327+
328+
let cases: [(i64, bool, [&str; 3]); 5] = [
329+
(1, true, ["POINT(0 0)", "POINT(2 2)", "POINT(2 2)"]),
330+
(2, true, ["POINT(1 0)", "POINT(3 2)", "POINT(4 2)"]),
331+
(-1, true, ["POINT(4.1 4.1)", "POINT(3 3)", "POINT(4 2)"]),
332+
(-2, true, ["POINT(1 1)", "POINT(3 2)", "POINT(2 2)"]),
333+
(-10, false, ["", "", ""]),
334+
];
335+
336+
for (index, ok, exp) in cases {
337+
let data = array.to_array_ref();
338+
let args = vec![
339+
ColumnarValue::Array(data),
340+
ColumnarValue::Scalar(index.into()),
341+
];
342+
let point_n = PointN::new();
343+
let result = point_n.invoke_batch(&args, 3);
344+
345+
if ok {
346+
let result = result.unwrap().to_array(3).unwrap();
347+
assert_eq!(result.data_type(), &POINT2D_TYPE.into());
348+
let result = PointArray::try_from((result.as_ref(), Dimension::XY)).unwrap();
349+
assert_eq!(result.get(0).unwrap().to_wkt().unwrap(), exp[0]);
350+
assert_eq!(result.get(1).unwrap().to_wkt().unwrap(), exp[1]);
351+
assert_eq!(result.get(2).unwrap().to_wkt().unwrap(), exp[2]);
352+
} else {
353+
assert_eq!(
354+
result.err().unwrap().to_string(),
355+
"Execution error: Index out of bounds"
356+
);
357+
}
358+
}
359+
}
360+
}

crates/runtime/src/datafusion/functions/geospatial/accessors/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
// under the License.
1717

1818
mod dim;
19+
mod line_string;
1920

2021
use datafusion::prelude::SessionContext;
2122

2223
/// Register all provided [geo] functions for constructing geometries
2324
pub fn register_udfs(ctx: &SessionContext) {
2425
ctx.register_udf(dim::GeomDimension::new().into());
26+
ctx.register_udf(line_string::StartPoint::new().into());
27+
ctx.register_udf(line_string::EndPoint::new().into());
28+
ctx.register_udf(line_string::PointN::new().into());
2529
}

0 commit comments

Comments
 (0)