diff --git a/spark/iceberg/pom.xml b/spark/iceberg/pom.xml new file mode 100644 index 0000000000..759aa9802b --- /dev/null +++ b/spark/iceberg/pom.xml @@ -0,0 +1,117 @@ + + + + 4.0.0 + + org.apache.sedona + sedona-spark-parent-${spark.compat.version}_${scala.compat.version} + 1.7.1-SNAPSHOT + ../pom.xml + + sedona-spark-iceberg-${spark.compat.version}_${scala.compat.version} + + ${project.groupId}:${project.artifactId} + A cluster computing system for processing large-scale spatial data: Iceberg Spark Support. + https://sedona.apache.org/ + jar + + + false + + + + + org.apache.sedona + sedona-common + ${project.version} + provided + + + com.fasterxml.jackson.core + * + + + + + org.apache.sedona + sedona-spark-common-${spark.compat.version}_${scala.compat.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.compat.version} + + + org.apache.spark + spark-sql_${scala.compat.version} + + + org.apache.iceberg + iceberg-spark-runtime-${spark.compat.version}_${scala.compat.version} + ${iceberg.version} + provided + + + org.locationtech.jts + jts-core + provided + + + org.scala-lang + scala-library + provided + + + org.scala-lang.modules + scala-collection-compat_${scala.compat.version} + provided + + + org.scalatest + scalatest_${scala.compat.version} + + + + src/main/scala + + + net.alchim31.maven + scala-maven-plugin + + + attach-javadocs + + true + + + + + + org.scalatest + scalatest-maven-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + + diff --git a/spark/iceberg/src/main/resources/META-INF/services/org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider b/spark/iceberg/src/main/resources/META-INF/services/org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider new file mode 100644 index 0000000000..4d7bd7b29c --- /dev/null +++ b/spark/iceberg/src/main/resources/META-INF/services/org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider @@ -0,0 +1 @@ +org.apache.sedona.SedonaGeospatialLibraryProvider diff --git a/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibrary.scala b/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibrary.scala new file mode 100644 index 0000000000..7a1a86a1ee --- /dev/null +++ b/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibrary.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona + +import SedonaGeospatialLibrary.BACKTICKS_PATTERN +import org.apache.iceberg.expressions.{Expressions => IcebergExpressions} +import org.apache.iceberg.spark.geo.spi.GeospatialLibrary +import org.apache.iceberg.{Geography, expressions} +import org.apache.sedona.common.geometryObjects.{Geography => SedonaGeography} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.execution.datasources.{PushableColumn, PushableColumnBase} +import org.apache.spark.sql.sedona_sql.UDT.{GeographyUDT, GeometryUDT} +import org.apache.spark.sql.sedona_sql.expressions._ +import org.apache.spark.sql.types.DataType +import org.locationtech.jts.geom.Geometry + +import java.util.regex.Pattern + +class SedonaGeospatialLibrary extends GeospatialLibrary { + override def getGeometryType: DataType = GeometryUDT + + override def getGeographyType: DataType = GeographyUDT + + override def fromGeometry(geometry: Geometry): AnyRef = GeometryUDT.serialize(geometry) + + override def toGeometry(datum: Any): Geometry = GeometryUDT.deserialize(datum) + + override def fromGeography(geography: Geography): AnyRef = + GeographyUDT.serialize(new SedonaGeography(geography.geometry())) + + override def toGeography(datum: Any): Geography = { + val sedonaGeography = GeographyUDT.deserialize(datum) + new Geography(sedonaGeography.getGeometry) + } + + override def isSpatialFilter(expression: Expression): Boolean = expression match { + case pred: ST_Predicate => !pred.isInstanceOf[ST_Disjoint] + case _ => false + } + + override def translateToIceberg(expression: Expression): expressions.Expression = { + val pushableColumn = PushableColumn(nestedPredicatePushdownEnabled = true) + expression match { + case ST_Intersects(_) | ST_Contains(_) | ST_Covers(_) | ST_Within(_) | ST_CoveredBy(_) => + val icebergExpr = { + for ((name, value) <- resolveNameAndLiteral(expression.children, pushableColumn)) + yield IcebergExpressions.stIntersects(unquote(name), GeometryUDT.deserialize(value)) + } + icebergExpr.orNull + case _ => null + } + } + + private def unquote(attributeName: String) = { + val matcher = BACKTICKS_PATTERN.matcher(attributeName) + matcher.replaceAll("$2") + } + + private def resolveNameAndLiteral( + expressions: Seq[Expression], + pushableColumn: PushableColumnBase): Option[(String, Any)] = { + expressions match { + case Seq(pushableColumn(name), Literal(v, _)) => Some(name, v) + case Seq(Literal(v, _), pushableColumn(name)) => Some(name, v) + case _ => None + } + } +} + +object SedonaGeospatialLibrary { + private val BACKTICKS_PATTERN = Pattern.compile("""([`])(.|$)""") +} diff --git a/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibraryProvider.scala b/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibraryProvider.scala new file mode 100644 index 0000000000..e57236ca3c --- /dev/null +++ b/spark/iceberg/src/main/scala/org/apache/sedona/SedonaGeospatialLibraryProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona + +import org.apache.iceberg.spark.geo.spi.GeospatialLibrary; +import org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider; + +class SedonaGeospatialLibraryProvider extends GeospatialLibraryProvider { + override def name(): String = "sedona" + + override def create(): GeospatialLibrary = new SedonaGeospatialLibrary() +} diff --git a/spark/iceberg/src/test/scala/org/apache/sedona/TestGeospatial.scala b/spark/iceberg/src/test/scala/org/apache/sedona/TestGeospatial.scala new file mode 100644 index 0000000000..44e7f0177a --- /dev/null +++ b/spark/iceberg/src/test/scala/org/apache/sedona/TestGeospatial.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona + +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +import java.io.File + +class TestGeospatial extends AnyFunSuite with BeforeAndAfterAll { + + private var warehouse: File = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + warehouse = File.createTempFile("warehouse", null) + warehouse.delete() + } + + override protected def afterAll(): Unit = { + super.afterAll() + warehouse.delete() + } + + test("test geospatial") { + val spark = SparkSession + .builder() + .master("local[*]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.local.type", "hadoop") + .config("spark.sql.catalog.local.warehouse", warehouse.getAbsolutePath) + .config( + "spark.sql.extensions", + "org.apache.sedona.sql.SedonaSqlExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sedona.enableParserExtension", "false") + .getOrCreate() + + spark + .sql( + "CREATE OR REPLACE TABLE local.tmp.geom_table (id INT, geom GEOMETRY) USING iceberg TBLPROPERTIES ('format-version' = '3')") + .show() + spark + .sql(""" + INSERT INTO local.tmp.geom_table VALUES + (1, ST_GeomFromText('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')), + (2, ST_GeomFromText('POINT (100 40)')) + """) + .show() + spark.sql("SELECT * FROM local.tmp.geom_table").show() + spark + .sql(""" + SELECT * FROM local.tmp.geom_table WHERE ST_Intersects(geom, ST_GeomFromText('POINT (1 2)')) + """) + .show() + + spark + .sql( + "CREATE OR REPLACE TABLE local.tmp.geog_table (id INT, geog GEOGRAPHY) USING iceberg TBLPROPERTIES ('format-version' = '3')") + .show() + spark + .sql(""" + INSERT INTO local.tmp.geog_table VALUES + (1, ST_GeogFromWKT('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')), + (2, ST_GeogFromWKT('POINT (100 40)')) + """) + .show() + spark.sql("SELECT * FROM local.tmp.geog_table").show() + } +} diff --git a/spark/pom.xml b/spark/pom.xml index c378d379b2..7c4efd87f3 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -38,6 +38,7 @@ common + iceberg spark-${spark.compat.version}