Skip to content

Commit

Permalink
Add Iceberg Spark geospatial support library
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Feb 25, 2025
1 parent aede0eb commit 4591abf
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 0 deletions.
117 changes: 117 additions & 0 deletions spark/iceberg/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-parent-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>1.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>sedona-spark-iceberg-${spark.compat.version}_${scala.compat.version}</artifactId>

<name>${project.groupId}:${project.artifactId}</name>
<description>A cluster computing system for processing large-scale spatial data: Iceberg Spark Support.</description>
<url>https://sedona.apache.org/</url>
<packaging>jar</packaging>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-common-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.sedona.SedonaGeospatialLibraryProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona

import SedonaGeospatialLibrary.BACKTICKS_PATTERN
import org.apache.iceberg.expressions.{Expressions => IcebergExpressions}
import org.apache.iceberg.spark.geo.spi.GeospatialLibrary
import org.apache.iceberg.{Geography, expressions}
import org.apache.sedona.common.geometryObjects.{Geography => SedonaGeography}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.execution.datasources.{PushableColumn, PushableColumnBase}
import org.apache.spark.sql.sedona_sql.UDT.{GeographyUDT, GeometryUDT}
import org.apache.spark.sql.sedona_sql.expressions._
import org.apache.spark.sql.types.DataType
import org.locationtech.jts.geom.Geometry

import java.util.regex.Pattern

class SedonaGeospatialLibrary extends GeospatialLibrary {
override def getGeometryType: DataType = GeometryUDT

override def getGeographyType: DataType = GeographyUDT

override def fromGeometry(geometry: Geometry): AnyRef = GeometryUDT.serialize(geometry)

override def toGeometry(datum: Any): Geometry = GeometryUDT.deserialize(datum)

override def fromGeography(geography: Geography): AnyRef =
GeographyUDT.serialize(new SedonaGeography(geography.geometry()))

override def toGeography(datum: Any): Geography = {
val sedonaGeography = GeographyUDT.deserialize(datum)
new Geography(sedonaGeography.getGeometry)
}

override def isSpatialFilter(expression: Expression): Boolean = expression match {
case pred: ST_Predicate => !pred.isInstanceOf[ST_Disjoint]
case _ => false
}

override def translateToIceberg(expression: Expression): expressions.Expression = {
val pushableColumn = PushableColumn(nestedPredicatePushdownEnabled = true)
expression match {
case ST_Intersects(_) | ST_Contains(_) | ST_Covers(_) | ST_Within(_) | ST_CoveredBy(_) =>
val icebergExpr = {
for ((name, value) <- resolveNameAndLiteral(expression.children, pushableColumn))
yield IcebergExpressions.stIntersects(unquote(name), GeometryUDT.deserialize(value))
}
icebergExpr.orNull
case _ => null
}
}

private def unquote(attributeName: String) = {
val matcher = BACKTICKS_PATTERN.matcher(attributeName)
matcher.replaceAll("$2")
}

private def resolveNameAndLiteral(
expressions: Seq[Expression],
pushableColumn: PushableColumnBase): Option[(String, Any)] = {
expressions match {
case Seq(pushableColumn(name), Literal(v, _)) => Some(name, v)
case Seq(Literal(v, _), pushableColumn(name)) => Some(name, v)
case _ => None
}
}
}

object SedonaGeospatialLibrary {
private val BACKTICKS_PATTERN = Pattern.compile("""([`])(.|$)""")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona

import org.apache.iceberg.spark.geo.spi.GeospatialLibrary;
import org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider;

class SedonaGeospatialLibraryProvider extends GeospatialLibraryProvider {
override def name(): String = "sedona"

override def create(): GeospatialLibrary = new SedonaGeospatialLibrary()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona

import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

import java.io.File

class TestGeospatial extends AnyFunSuite with BeforeAndAfterAll {

private var warehouse: File = _

override protected def beforeAll(): Unit = {
super.beforeAll()
warehouse = File.createTempFile("warehouse", null)
warehouse.delete()
}

override protected def afterAll(): Unit = {
super.afterAll()
warehouse.delete()
}

test("test geospatial") {
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", warehouse.getAbsolutePath)
.config(
"spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sedona.enableParserExtension", "false")
.getOrCreate()

spark
.sql(
"CREATE OR REPLACE TABLE local.tmp.geom_table (id INT, geom GEOMETRY) USING iceberg TBLPROPERTIES ('format-version' = '3')")
.show()
spark
.sql("""
INSERT INTO local.tmp.geom_table VALUES
(1, ST_GeomFromText('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')),
(2, ST_GeomFromText('POINT (100 40)'))
""")
.show()
spark.sql("SELECT * FROM local.tmp.geom_table").show()
spark
.sql("""
SELECT * FROM local.tmp.geom_table WHERE ST_Intersects(geom, ST_GeomFromText('POINT (1 2)'))
""")
.show()

spark
.sql(
"CREATE OR REPLACE TABLE local.tmp.geog_table (id INT, geog GEOGRAPHY) USING iceberg TBLPROPERTIES ('format-version' = '3')")
.show()
spark
.sql("""
INSERT INTO local.tmp.geog_table VALUES
(1, ST_GeogFromWKT('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')),
(2, ST_GeogFromWKT('POINT (100 40)'))
""")
.show()
spark.sql("SELECT * FROM local.tmp.geog_table").show()
}
}
1 change: 1 addition & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

<modules>
<module>common</module>
<module>iceberg</module>
<module>spark-${spark.compat.version}</module>
</modules>

Expand Down

0 comments on commit 4591abf

Please sign in to comment.