diff --git a/common/src/main/java/org/apache/sedona/common/Constructors.java b/common/src/main/java/org/apache/sedona/common/Constructors.java index 3cd4729243..b53e8162dd 100644 --- a/common/src/main/java/org/apache/sedona/common/Constructors.java +++ b/common/src/main/java/org/apache/sedona/common/Constructors.java @@ -22,6 +22,7 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.sedona.common.enums.FileDataSplitter; import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.common.geometryObjects.Geography; import org.apache.sedona.common.utils.FormatUtils; import org.apache.sedona.common.utils.GeoHashDecoder; import org.locationtech.jts.geom.*; @@ -44,6 +45,10 @@ public static Geometry geomFromWKT(String wkt, int srid) throws ParseException { return new WKTReader(geometryFactory).read(wkt); } + public static Geography geogFromWKT(String wkt, int srid) throws ParseException { + return new Geography(geomFromWKT(wkt, srid)); + } + public static Geometry geomFromEWKT(String ewkt) throws ParseException { if (ewkt == null) { return null; diff --git a/common/src/main/java/org/apache/sedona/common/Functions.java b/common/src/main/java/org/apache/sedona/common/Functions.java index b5a181aa29..b743756fbb 100644 --- a/common/src/main/java/org/apache/sedona/common/Functions.java +++ b/common/src/main/java/org/apache/sedona/common/Functions.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.sedona.common.geometryObjects.Circle; +import org.apache.sedona.common.geometryObjects.Geography; import org.apache.sedona.common.sphere.Spheroid; import org.apache.sedona.common.subDivide.GeometrySubDivider; import org.apache.sedona.common.utils.*; @@ -784,6 +785,10 @@ public static byte[] asEWKB(Geometry geometry) { return GeomUtils.getEWKB(geometry); } + public static byte[] geogAsEWKB(Geography geography) { + return asEWKB(geography.getGeometry()); + } + public static String asHexEWKB(Geometry geom, String endian) { if (endian.equalsIgnoreCase("NDR")) { return GeomUtils.getHexEWKB(geom, ByteOrderValues.LITTLE_ENDIAN); diff --git a/common/src/main/java/org/apache/sedona/common/geometryObjects/Geography.java b/common/src/main/java/org/apache/sedona/common/geometryObjects/Geography.java new file mode 100644 index 0000000000..827523957b --- /dev/null +++ b/common/src/main/java/org/apache/sedona/common/geometryObjects/Geography.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.common.geometryObjects; + +import org.locationtech.jts.geom.Geometry; + +public class Geography { + private Geometry geometry; + + public Geography(Geometry geometry) { + this.geometry = geometry; + } + + public Geometry getGeometry() { + return this.geometry; + } + + public String toString() { + return this.geometry.toText(); + } +} diff --git a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerde.java b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerde.java index 7b7399cdb8..5475d37c7b 100644 --- a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerde.java +++ b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerde.java @@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.io.Output; import java.io.Serializable; import org.apache.sedona.common.geometryObjects.Circle; +import org.apache.sedona.common.geometryObjects.Geography; import org.locationtech.jts.geom.Envelope; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.geom.GeometryCollection; @@ -36,7 +37,7 @@ * Provides methods to efficiently serialize and deserialize geometry types. * *
Supports Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, - * GeometryCollection, Circle and Envelope types. + * GeometryCollection, Circle, Envelope, and Geography types. * *
First byte contains {@link Type#id}. Then go type-specific bytes, followed by user-data * attached to the geometry. @@ -63,6 +64,9 @@ public void write(Kryo kryo, Output out, Object object) { out.writeDouble(envelope.getMaxX()); out.writeDouble(envelope.getMinY()); out.writeDouble(envelope.getMaxY()); + } else if (object instanceof Geography) { + writeType(out, Type.GEOGRAPHY); + writeGeometry(kryo, out, ((Geography) object).getGeometry()); } else { throw new UnsupportedOperationException( "Cannot serialize object of type " + object.getClass().getName()); @@ -118,6 +122,10 @@ public Object read(Kryo kryo, Input input, Class aClass) { return new Envelope(); } } + case GEOGRAPHY: + { + return new Geography(readGeometry(kryo, input)); + } default: throw new UnsupportedOperationException( "Cannot deserialize object of type " + geometryType); @@ -145,7 +153,8 @@ private Geometry readGeometry(Kryo kryo, Input input) { private enum Type { SHAPE(0), CIRCLE(1), - ENVELOPE(2); + ENVELOPE(2), + GEOGRAPHY(3); private final int id; diff --git a/python/sedona/core/geom/geography.py b/python/sedona/core/geom/geography.py new file mode 100644 index 0000000000..262b377f62 --- /dev/null +++ b/python/sedona/core/geom/geography.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pickle + +from sedona.utils.decorators import require + + +class Geography: + + def __init__(self, geometry): + self._geom = geometry + self.userData = None + + def getUserData(self): + return self.userData + + @classmethod + def from_jvm_instance(cls, java_obj): + return Geography(java_obj.geometry) + + @classmethod + def serialize_for_java(cls, geogs): + return pickle.dumps(geogs) + + @require(["Geography"]) + def create_jvm_instance(self, jvm): + return jvm.Geography(self._geom) diff --git a/python/sedona/register/java_libs.py b/python/sedona/register/java_libs.py index 8d1681d15e..466827cbf4 100644 --- a/python/sedona/register/java_libs.py +++ b/python/sedona/register/java_libs.py @@ -26,6 +26,7 @@ class SedonaJvmLib(Enum): KNNQuery = "org.apache.sedona.core.spatialOperator.KNNQuery" RangeQuery = "org.apache.sedona.core.spatialOperator.RangeQuery" Envelope = "org.locationtech.jts.geom.Envelope" + Geography = "org.apache.sedona.common.geometryObjects.Geography" GeoSerializerData = ( "org.apache.sedona.python.wrapper.adapters.GeoSparkPythonConverter" ) diff --git a/python/sedona/spark/__init__.py b/python/sedona/spark/__init__.py index 50d1d1131e..8f4c4f24ed 100644 --- a/python/sedona/spark/__init__.py +++ b/python/sedona/spark/__init__.py @@ -42,7 +42,7 @@ from sedona.sql.st_constructors import * from sedona.sql.st_functions import * from sedona.sql.st_predicates import * -from sedona.sql.types import GeometryType, RasterType +from sedona.sql.types import GeometryType, GeographyType, RasterType from sedona.utils import KryoSerializer, SedonaKryoRegistrator from sedona.utils.adapter import Adapter from sedona.utils.geoarrow import dataframe_to_arrow diff --git a/python/sedona/sql/types.py b/python/sedona/sql/types.py index c966d451ca..cf77b72d78 100644 --- a/python/sedona/sql/types.py +++ b/python/sedona/sql/types.py @@ -33,6 +33,7 @@ SedonaRaster = None from ..utils import geometry_serde +from ..core.geom.geography import Geography class GeometryType(UserDefinedType): @@ -60,6 +61,31 @@ def scalaUDT(cls): return "org.apache.spark.sql.sedona_sql.UDT.GeometryUDT" +class GeographyType(UserDefinedType): + + @classmethod + def sqlType(cls): + return BinaryType() + + def serialize(self, obj): + return geometry_serde.serialize(obj._geom) + + def deserialize(self, datum): + geom, offset = geometry_serde.deserialize(datum) + return Geography(geom) + + @classmethod + def module(cls): + return "sedona.sql.types" + + def needConversion(self): + return True + + @classmethod + def scalaUDT(cls): + return "org.apache.spark.sql.sedona_sql.UDT.GeographyUDT" + + class RasterType(UserDefinedType): @classmethod diff --git a/python/sedona/utils/geometry_adapter.py b/python/sedona/utils/geometry_adapter.py index 9a81307d86..abc5dc5031 100644 --- a/python/sedona/utils/geometry_adapter.py +++ b/python/sedona/utils/geometry_adapter.py @@ -15,9 +15,12 @@ # specific language governing permissions and limitations # under the License. +from typing import Union + from shapely.geometry.base import BaseGeometry from sedona.core.geom.envelope import Envelope +from sedona.core.geom.geography import Geography from sedona.core.jvm.translate import JvmGeometryAdapter from sedona.utils.spatial_rdd_parser import GeometryFactory @@ -25,13 +28,15 @@ class GeometryAdapter: @classmethod - def create_jvm_geometry_from_base_geometry(cls, jvm, geom: BaseGeometry): + def create_jvm_geometry_from_base_geometry( + cls, jvm, geom: Union[BaseGeometry, Geography] + ): """ :param jvm: :param geom: :return: """ - if isinstance(geom, Envelope): + if isinstance(geom, (Envelope, Geography)): jvm_geom = geom.create_jvm_instance(jvm) else: decoded_geom = GeometryFactory.to_bytes(geom) diff --git a/python/sedona/utils/prep.py b/python/sedona/utils/prep.py index c9528300cf..8cabfa05c2 100644 --- a/python/sedona/utils/prep.py +++ b/python/sedona/utils/prep.py @@ -28,6 +28,8 @@ ) from shapely.geometry.base import BaseGeometry +from ..core.geom.geography import Geography + def assign_all() -> bool: geoms = [ @@ -41,6 +43,7 @@ def assign_all() -> bool: ] assign_udt_shapely_objects(geoms=geoms) assign_user_data_to_shapely_objects(geoms=geoms) + assign_udt_geography() return True @@ -55,3 +58,9 @@ def assign_udt_shapely_objects(geoms: List[type(BaseGeometry)]) -> bool: def assign_user_data_to_shapely_objects(geoms: List[type(BaseGeometry)]) -> bool: for geom in geoms: geom.getUserData = lambda geom_instance: geom_instance.userData + + +def assign_udt_geography(): + from sedona.sql.types import GeographyType + + Geography.__UDT__ = GeographyType() diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/GeographyUDT.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/GeographyUDT.scala new file mode 100644 index 0000000000..48eed2b0d0 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/GeographyUDT.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.UDT + +import org.apache.sedona.common.geometrySerde.GeometrySerializer; +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.json4s.JsonDSL._ +import org.json4s.JsonAST.JValue +import org.apache.sedona.common.geometryObjects.Geography; + +class GeographyUDT extends UserDefinedType[Geography] { + override def sqlType: DataType = BinaryType + + override def pyUDT: String = "sedona.sql.types.GeographyType" + + override def userClass: Class[Geography] = classOf[Geography] + + override def serialize(obj: Geography): Array[Byte] = + GeometrySerializer.serialize(obj.getGeometry()) + + override def deserialize(datum: Any): Geography = { + datum match { + case value: Array[Byte] => new Geography(GeometrySerializer.deserialize(value)) + } + } + + override private[sql] def jsonValue: JValue = { + super.jsonValue mapField { + case ("class", _) => "class" -> this.getClass.getName.stripSuffix("$") + case other: Any => other + } + } + + override def equals(other: Any): Boolean = other match { + case _: UserDefinedType[_] => other.isInstanceOf[GeographyUDT] + case _ => false + } + + override def hashCode(): Int = userClass.hashCode() +} + +case object GeographyUDT + extends org.apache.spark.sql.sedona_sql.UDT.GeographyUDT + with scala.Serializable diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala index a96d15c008..e581625556 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.sedona_sql.UDT import org.apache.spark.sql.types.UDTRegistration import org.locationtech.jts.geom.Geometry +import org.apache.sedona.common.geometryObjects.Geography; import org.locationtech.jts.index.SpatialIndex object UdtRegistratorWrapper { def registerAll(): Unit = { UDTRegistration.register(classOf[Geometry].getName, classOf[GeometryUDT].getName) + UDTRegistration.register(classOf[Geography].getName, classOf[GeographyUDT].getName) UDTRegistration.register(classOf[SpatialIndex].getName, classOf[IndexUDT].getName) } } diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala index d787ff152b..cf2a05d3d3 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala @@ -98,6 +98,20 @@ case class ST_GeomFromWKT(inputExpressions: Seq[Expression]) } } +/** + * Return a Geography from a WKT string + * + * @param inputExpressions + * This function takes a geometry string and a srid. The string format must be WKT. + */ +case class ST_GeogFromWKT(inputExpressions: Seq[Expression]) + extends InferredExpression(Constructors.geogFromWKT _) { + + protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = { + copy(inputExpressions = newChildren) + } +} + /** * Return a Geometry from a OGC Extended WKT string * diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala index 705e37758b..e720ba3712 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala @@ -506,7 +506,7 @@ case class ST_AsBinary(inputExpressions: Seq[Expression]) } case class ST_AsEWKB(inputExpressions: Seq[Expression]) - extends InferredExpression(Functions.asEWKB _) { + extends InferredExpression(Functions.geogAsEWKB _) { protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = { copy(inputExpressions = newChildren) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala index 935c2d5e3d..b138c7a2f9 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT +import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, GeographyUDT} import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, DataTypes, DoubleType, IntegerType, LongType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.locationtech.jts.geom.Geometry @@ -34,6 +34,7 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag import scala.reflect.runtime.universe.Type import scala.reflect.runtime.universe.typeOf +import org.apache.sedona.common.geometryObjects.Geography /** * Custom exception to include the input row and the original exception message. @@ -160,6 +161,10 @@ object InferrableType { new InferrableType[Geometry] {} implicit val geometryArrayInstance: InferrableType[Array[Geometry]] = new InferrableType[Array[Geometry]] {} + implicit val geographyInstance: InferrableType[Geography] = + new InferrableType[Geography] {} + implicit val geographyArrayInstance: InferrableType[Array[Geography]] = + new InferrableType[Array[Geography]] {} implicit val javaDoubleInstance: InferrableType[java.lang.Double] = new InferrableType[java.lang.Double] {} implicit val javaIntegerInstance: InferrableType[java.lang.Integer] = @@ -230,6 +235,12 @@ object InferredTypes { } else { null } + } else if (t =:= typeOf[Geography]) { output => + if (output != null) { + output.asInstanceOf[Geography].toGenericArrayData + } else { + null + } } else if (InferredRasterExpression.isRasterType(t)) { InferredRasterExpression.rasterSerializer } else if (t =:= typeOf[String]) { output => @@ -259,6 +270,14 @@ object InferredTypes { } else { null } + } else if (t =:= typeOf[Array[Geography]] || t =:= typeOf[java.util.List[Geography]]) { + output => + if (output != null) { + ArrayData.toArrayData(output.asInstanceOf[Array[Geography]].map(_.toGenericArrayData)) + } else { + null + } + } else if (InferredRasterExpression.isRasterArrayType(t)) { InferredRasterExpression.rasterArraySerializer } else if (t =:= typeOf[Option[Boolean]]) { output => @@ -277,6 +296,10 @@ object InferredTypes { GeometryUDT } else if (t =:= typeOf[Array[Geometry]] || t =:= typeOf[java.util.List[Geometry]]) { DataTypes.createArrayType(GeometryUDT) + } else if (t =:= typeOf[Geography]) { + GeographyUDT + } else if (t =:= typeOf[Array[Geography]] || t =:= typeOf[java.util.List[Geography]]) { + DataTypes.createArrayType(GeographyUDT) } else if (InferredRasterExpression.isRasterType(t)) { InferredRasterExpression.rasterUDT } else if (InferredRasterExpression.isRasterArrayType(t)) { diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/implicits.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/implicits.scala index fd55cc02d8..1387053b83 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/implicits.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/implicits.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.unsafe.types.UTF8String import org.locationtech.jts.geom.{Geometry, GeometryFactory, Point} +import org.apache.sedona.common.geometryObjects.Geography object implicits { @@ -141,4 +142,9 @@ object implicits { def isNonEmpty: Boolean = geom != null && !geom.isEmpty } + + implicit class GeographyEnhancer(geog: Geography) { + + def toGenericArrayData: Array[Byte] = GeometrySerializer.serialize(geog.getGeometry()) + } }