Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-468] Add useSpheroid provision in ST_DWithin #1208

Merged
merged 32 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
21258e8
Add ST_DWithin
iGN5117 Jan 1, 2024
e887506
Add documentation for ST_DWithin
iGN5117 Jan 1, 2024
f1b3f84
Remove unwanted code
iGN5117 Jan 1, 2024
341017d
removed null check test for ST_DWithin
iGN5117 Jan 1, 2024
4f6f09d
Fix EOF lint error
iGN5117 Jan 1, 2024
a41339d
Add explanation for ST_DWithin
iGN5117 Jan 2, 2024
043223d
Remove CRS checking logic in ST_DWithin
iGN5117 Jan 2, 2024
0079af5
Add optimized join support for ST_DWithin
iGN5117 Jan 14, 2024
747dca8
Merge branch 'develop_Nilesh_1.5.1' of https://github.com/iGN5117/sed…
iGN5117 Jan 14, 2024
1c61199
Remove test change to resourceFolder
iGN5117 Jan 14, 2024
f9ee1ec
remove unnecessary cast to double
iGN5117 Jan 14, 2024
8a0fc7d
Add broadcast join test
iGN5117 Jan 14, 2024
158325f
Add example of ST_DWithin in Optimizer.md
iGN5117 Jan 14, 2024
cda4802
Add useSpheroid version to ST_DWithin | Add optimized join support
iGN5117 Jan 19, 2024
1fe3c98
Merge branch 'sedona-master' into develop_Nilesh_1.5.1
iGN5117 Jan 19, 2024
91f3ca7
remove accidental resourceFolder change
iGN5117 Jan 19, 2024
3bf9c99
Fix mistake in making useSpheroid optional in ST_DWithin
iGN5117 Jan 19, 2024
ecac80d
Fix incorrect test data in test_dataframe_api.py
iGN5117 Jan 19, 2024
57f3f94
fix failing test in test_predicate.py
iGN5117 Jan 19, 2024
ae9f131
Address PR changes | Move ST_DWithin to DistanceJoin
iGN5117 Jan 20, 2024
8488cad
fix failing test
iGN5117 Jan 20, 2024
c1e85b9
Remove randomness from sphere test case generation
iGN5117 Jan 25, 2024
22422d7
Merge branch 'sedona-master' into develop_Nilesh_1.5.1
iGN5117 Jan 25, 2024
e486724
Refactor documentation of ST_DWithin
iGN5117 Jan 25, 2024
5a31de2
revert resourceFolder path
iGN5117 Jan 25, 2024
b569d89
Handle complex boolean expressions in ST_DWithin
iGN5117 Jan 27, 2024
a5022b2
add a blanket try catch for ST_DWithin to handle complex boolean expr…
iGN5117 Jan 28, 2024
7e6cd49
add collect to the python test
iGN5117 Jan 28, 2024
d0bfbaa
replace head() with count()
iGN5117 Jan 28, 2024
51b3d11
Merge branch 'sedona-master' into develop_Nilesh_1.5.1
iGN5117 Jan 28, 2024
5352fb9
Add null check for geometry column while adding a df to keplergl
iGN5117 Jan 28, 2024
172ae05
Revert "Add null check for geometry column while adding a df to keple…
iGN5117 Jan 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions binder/Sedona_OvertureMaps_GeoParquet.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
"tags": []
},
"source": [
"# Overture Maps data"
"# Overture Map data"
]
},
{
"cell_type": "markdown",
"id": "3cfd484a-e6ac-4cdb-a8ac-2ed77fbd23db",
"metadata": {},
"source": [
"## Wherobots distribution\n",
"## Wherobots version\n",
"\n",
"At this time, Wherobots only maintains a distribution of the Overture Maps dataset for `overturemaps-us-west-2/release/2023-07-26-alpha.0/`.\n",
"Wherobots only releases a version for overturemaps-us-west-2/release/2023-07-26-alpha.0/.\n",
"\n",
"This data is in GeoParquet format and is clustered by spatial promity to ensure efficient filter pushdown performance."
"This data is in GeoParquet format and data is clusterd by their spatial promity to ensure efficient filter pushdown performance"
]
},
{
Expand All @@ -47,17 +47,17 @@
},
"outputs": [],
"source": [
"DATA_LINK = \"s3a://wherobots-examples/data/overturemaps-us-west-2/release/2023-07-26-alpha.0/\""
"DATA_LINK = \"s3a://wherobots-public-data/overturemaps-us-west-2/release/2023-07-26-alpha.0/\""
]
},
{
"cell_type": "markdown",
"id": "b5fcf043-a97d-4321-ac16-fafcdbbcf3aa",
"metadata": {},
"source": [
"## Overture Maps Foundation distributions\n",
"## OMF versions\n",
"\n",
"The following files are from the official Overture Maps Foundation releases. They are GeoParquet files generated by Apache Sedona.\n",
"The following files are official OMF releases. They are GeoParquet files generated by Apache Sedona.\n",
"\n",
"However, unlike the Wherobots version, these data might not be in the same file structures and hence the spatial query might be a bit slower.\n"
]
Expand Down
12 changes: 11 additions & 1 deletion common/src/main/java/org/apache/sedona/common/Predicates.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.sedona.common;

import org.locationtech.jts.geom.Geometry;
import org.apache.sedona.common.sphere.Spheroid;

public class Predicates {
public static boolean contains(Geometry leftGeometry, Geometry rightGeometry) {
Expand Down Expand Up @@ -50,6 +51,15 @@ public static boolean orderingEquals(Geometry leftGeometry, Geometry rightGeomet
return leftGeometry.equalsExact(rightGeometry);
}
public static boolean dWithin(Geometry leftGeometry, Geometry rightGeometry, double distance) {
return leftGeometry.isWithinDistance(rightGeometry, distance);
return dWithin(leftGeometry, rightGeometry, distance, false);
}

public static boolean dWithin(Geometry leftGeometry, Geometry rightGeometry, double distance, boolean useSpheroid) {
if (useSpheroid) {
double distanceSpheroid = Spheroid.distance(leftGeometry, rightGeometry);
return distanceSpheroid <= distance;
}else {
return leftGeometry.isWithinDistance(rightGeometry, distance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,15 @@ public void testDWithinGeomCollection() {
assertTrue(actual);
}

@Test
public void testDWithinSpheroid() {
Geometry seattlePoint = GEOMETRY_FACTORY.createPoint(new Coordinate(-122.335167, 47.608013));
Geometry newYorkPoint = GEOMETRY_FACTORY.createPoint(new Coordinate(-73.935242, 40.730610));

double distance = 4000 * 1e3; //distance between NY and Seattle is less than 4000 km
boolean actual = Predicates.dWithin(newYorkPoint, seattlePoint, distance, true);
assertTrue(actual);
}


}
23 changes: 21 additions & 2 deletions docs/api/flink/Predicate.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ true

## ST_DWithin

Introduction: Returns true if 'leftGeometry' and 'rightGeometry' are within a specified 'distance'. This function essentially checks if the shortest distance between the envelope of the two geometries is <= the provided distance.
Introduction: Returns true if 'leftGeometry' and 'rightGeometry' are within a specified 'distance'.

Format: `ST_DWithin (leftGeometry: Geometry, rightGeometry: Geometry, distance: Double)`
If `useSpheroid` is passed true, ST_DWithin uses Sedona's ST_DistanceSpheroid to check the spheroid distance between the centroids of two geometries. The unit of the distance in this case is meter.

If `useSpheroid` is passed false, ST_DWithin uses Euclidean distance and the unit of the distance is the same as the CRS of the geometries. To obtain the correct result, please consider using ST_Transform to put data in an appropriate CRS.

If useSpheroid is not given, it defaults to false

Format: `ST_DWithin (leftGeometry: Geometry, rightGeometry: Geometry, distance: Double, useSpheroid: Optional(Boolean) = false)`

Since: `v1.5.1`

Expand All @@ -78,6 +84,19 @@ Output:
true
```

```text
Check for distance between New York and Seattle (< 4000 km)
```

```sql
SELECT ST_DWithin(ST_GeomFromWKT(-122.335167 47.608013), ST_GeomFromWKT(-73.935242 40.730610), 4000000, true)
```

Output:
```
true
```

## ST_Equals

Introduction: Return true if A equals to B
Expand Down
23 changes: 21 additions & 2 deletions docs/api/sql/Predicate.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ true

## ST_DWithin

Introduction: Returns true if 'leftGeometry' and 'rightGeometry' are within a specified 'distance'. This function essentially checks if the shortest distance between the envelope of the two geometries is <= the provided distance.
Introduction: Returns true if 'leftGeometry' and 'rightGeometry' are within a specified 'distance'.

Format: `ST_DWithin (leftGeometry: Geometry, rightGeometry: Geometry, distance: Double)`
If `useSpheroid` is passed true, ST_DWithin uses Sedona's ST_DistanceSpheroid to check the spheroid distance between the centroids of two geometries. The unit of the distance in this case is meter.

If `useSpheroid` is passed false, ST_DWithin uses Euclidean distance and the unit of the distance is the same as the CRS of the geometries. To obtain the correct result, please consider using ST_Transform to put data in an appropriate CRS.

If useSpheroid is not given, it defaults to false

Format: `ST_DWithin (leftGeometry: Geometry, rightGeometry: Geometry, distance: Double, useSpheroid: Optional(Boolean) = false)`

Since: `v1.5.1`

Expand All @@ -78,6 +84,19 @@ Output:
true
```

```text
Check for distance between New York and Seattle (< 4000 km)
```

```sql
SELECT ST_DWithin(ST_GeomFromWKT(-122.335167 47.608013), ST_GeomFromWKT(-73.935242 40.730610), 4000000, true)
```

Output:
```
true
```

## ST_Equals

Introduction: Return true if A equals to B
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,13 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.dWithin(geom1, geom2, distance);
}

@DataTypeHint("Boolean")
public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2, @DataTypeHint("Double") Double distance, @DataTypeHint("Boolean") Boolean useSphere)
{
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Predicates.dWithin(geom1, geom2, distance, useSphere);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,12 @@ public void testDWithinFailure() {
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(false, actual);
}

@Test
public void testDWithinSphere() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POINT (-122.335167 47.608013)') as seattle, ST_GeomFromWKT('POINT (-73.935242 40.730610)') as ny");
table = table.select(call(Predicates.ST_DWithin.class.getSimpleName(), $("seattle"), $("ny"), 3500 * 1e3, true));
Boolean actual = (Boolean) first(table).getField(0);
assertEquals(false, actual);
}
}
9 changes: 5 additions & 4 deletions python/sedona/sql/st_predicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import partial

from pyspark.sql import Column
from typing import Union
from typing import Union, Optional

from sedona.sql.dataframe_api import ColumnOrName, call_sedona_function, validate_argument_types

Expand Down Expand Up @@ -194,13 +194,14 @@ def ST_CoveredBy(a: ColumnOrName, b: ColumnOrName) -> Column:
return _call_predicate_function("ST_CoveredBy", (a, b))

@validate_argument_types
def ST_DWithin(a: ColumnOrName, b: ColumnOrName, distance: Union[ColumnOrName, float]):
def ST_DWithin(a: ColumnOrName, b: ColumnOrName, distance: Union[ColumnOrName, float], use_sphere: Optional[Union[ColumnOrName, bool]] = None):
"""
Check if geometry a is within 'distance' units of geometry b
:param a: Geometry column to check
:param b: Geometry column to check
:param distance: distance units to check the within predicate
:param use_sphere: whether to use spheroid distance or euclidean distance
:return: True if a is within distance units of Geometry b
"""

return _call_predicate_function("ST_DWithin", (a, b, distance))
args = (a, b, distance, use_sphere) if use_sphere is not None else (a, b, distance,)
return _call_predicate_function("ST_DWithin", args)
5 changes: 4 additions & 1 deletion python/tests/sql/test_dataframe_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@
(stp.ST_Within, (lambda: f.expr("ST_Point(0.0, 0.0)"), "geom"), "triangle_geom", "", False),
(stp.ST_Covers, ("geom", lambda: f.expr("ST_Point(0.0, 0.0)")), "triangle_geom", "", True),
(stp.ST_CoveredBy, (lambda: f.expr("ST_Point(0.0, 0.0)"), "geom"), "triangle_geom", "", True),
(stp.ST_DWithin, ("origin", "point", 5.0), "origin_and_point", "", True),
(stp.ST_DWithin, ("origin", "point", 5.0,), "origin_and_point", "", True),
(stp.ST_DWithin, ("ny", "seattle", 4000000.0, True), "ny_seattle", "", True),

# aggregates
(sta.ST_Envelope_Aggr, ("geom",), "exploded_points", "", "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"),
Expand Down Expand Up @@ -436,6 +437,8 @@ def base_df(self, request):
return TestDataFrameAPI.spark.sql("SELECT ST_GeomFromWKT('LINESTRING (0 2, 1 1, 2 0)') AS line, ST_GeomFromWKT('POINT (0 0)') AS point")
elif request.param == "origin_and_point":
return TestDataFrameAPI.spark.sql("SELECT ST_GeomFromWKT('POINT (0 0)') AS origin, ST_GeomFromWKT('POINT (1 0)') as point")
elif request.param == "ny_seattle":
return TestDataFrameAPI.spark.sql("SELECT ST_GeomFromWKT('POINT (-122.335167 47.608013)') AS seattle, ST_GeomFromWKT('POINT (-73.935242 40.730610)') as ny")
raise ValueError(f"Invalid base_df name passed: {request.param}")

def _id_test_configuration(val):
Expand Down
15 changes: 15 additions & 0 deletions python/tests/sql/test_predicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from tests import csv_point_input_location, csv_point1_input_location, csv_polygon1_input_location
from tests.test_base import TestBase
from pyspark.sql.functions import expr


class TestPredicate(TestBase):
Expand Down Expand Up @@ -222,3 +223,17 @@ def test_st_dwithin(self):
test_table.createOrReplaceTempView("test_table")
isWithin = self.spark.sql("select ST_DWithin(origin, point_1, 3) from test_table").head()[0]
assert isWithin is True

def test_dwithin_use_sphere(self):
test_table = self.spark.sql("select ST_GeomFromWKT('POINT (-122.335167 47.608013)') as seattle, ST_GeomFromWKT('POINT (-73.935242 40.730610)') as ny")
test_table.createOrReplaceTempView("test_table")
isWithin = self.spark.sql("select ST_DWithin(seattle, ny, 2000000, true) from test_table").head()[0]
assert isWithin is False


def test_dwithin_use_sphere_complex_boolean_expression(self):
expected = 55
df_point = self.spark.range(10).withColumn("pt", expr("ST_Point(id, id)"))
df_polygon = self.spark.range(10).withColumn("poly", expr("ST_Point(id, id + 0.01)"))
actual = df_point.alias("a").join(df_polygon.alias("b"), expr("ST_DWithin(pt, poly, 10000, a.`id` % 2 = 0)")).count()
assert expected == actual
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ case class ST_OrderingEquals(inputExpressions: Seq[Expression])
}

case class ST_DWithin(inputExpressions: Seq[Expression])
extends InferredExpression(Predicates.dWithin _) {
extends InferredExpression(inferrableFunction3(Predicates.dWithin), inferrableFunction4(Predicates.dWithin)) {

protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChildren)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object st_predicates extends DataFrameAPI {
def ST_CoveredBy(a: Column, b: Column): Column = wrapExpression[ST_CoveredBy](a, b)
def ST_CoveredBy(a: String, b: String): Column = wrapExpression[ST_CoveredBy](a, b)
def ST_DWithin(a: Column, b: Column, distance: Column): Column = wrapExpression[ST_DWithin](a, b, distance)

def ST_DWithin(a: String, b: String, distance: Double): Column = wrapExpression[ST_DWithin](a, b, distance)
def ST_DWithin(a: Column, b: Column, distance: Column, useSphere: Column): Column = wrapExpression[ST_DWithin](a, b, distance, useSphere)
def ST_DWithin(a: String, b: String, distance: Double, useSphere: Boolean): Column = wrapExpression[ST_DWithin](a, b, distance, useSphere)
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ class JoinQueryDetector(sparkSession: SparkSession) extends Strategy {
getJoinDetection(left, right, predicate, Some(extraCondition))
case Some(And(extraCondition, predicate: ST_Predicate)) =>
getJoinDetection(left, right, predicate, Some(extraCondition))
case Some(ST_DWithin(Seq(leftShape, rightShape, distance))) =>
Some(JoinQueryDetection(left, right, leftShape, ST_Buffer(Seq(rightShape, distance)), SpatialPredicate.INTERSECTS, isGeography = false, condition, None))
case Some(And(ST_DWithin(Seq(leftShape, rightShape, distance)), extraCondition)) =>
Some(JoinQueryDetection(left, right, leftShape, ST_Buffer(Seq(rightShape, distance)), SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(extraCondition)))
case Some(And(extraCondition, ST_DWithin(Seq(leftShape, rightShape, distance)))) =>
Some(JoinQueryDetection(left, right, leftShape, ST_Buffer(Seq(rightShape, distance)), SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(extraCondition)))
//For raster-vector joins
case Some(predicate: RS_Predicate) =>
getRasterJoinDetection(left, right, predicate, None)
Expand All @@ -140,6 +134,36 @@ class JoinQueryDetector(sparkSession: SparkSession) extends Strategy {
case Some(And(extraCondition, predicate: RS_Predicate)) =>
getRasterJoinDetection(left, right, predicate, Some(extraCondition))
// For distance joins we execute the actual predicate (condition) and not only extraConditions.
case Some(ST_DWithin(Seq(leftShape, rightShape, distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
case Some(And(ST_DWithin(Seq(leftShape, rightShape, distance)), _)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
case Some(And(_, ST_DWithin(Seq(leftShape, rightShape, distance)))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
case Some(ST_DWithin(Seq(leftShape, rightShape, distance, useSpheroid))) =>
try {
val useSpheroidUnwrapped = useSpheroid.eval().asInstanceOf[Boolean]
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = useSpheroidUnwrapped, condition, Some(distance)))
}catch {
case _: Throwable =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
}
case Some(And(ST_DWithin(Seq(leftShape, rightShape, distance, useSpheroid)), _)) =>
try {
val useSpheroidUnwrapped = useSpheroid.eval().asInstanceOf[Boolean]
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = useSpheroidUnwrapped, condition, Some(distance)))
}catch {
case _: Throwable =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
}
case Some(And(_, ST_DWithin(Seq(leftShape, rightShape, distance, useSpheroid)))) =>
try {
val useSpheroidUnwrapped = useSpheroid.eval().asInstanceOf[Boolean]
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = useSpheroidUnwrapped, condition, Some(distance)))
}catch {
case _: Throwable =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, isGeography = false, condition, Some(distance)))
}
case Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), distance)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), distance), _)) =>
Expand Down Expand Up @@ -397,13 +421,15 @@ class JoinQueryDetector(sparkSession: SparkSession) extends Strategy {
val b = children.tail.head
val isRasterPredicate = a.dataType.isInstanceOf[RasterUDT] || b.dataType.isInstanceOf[RasterUDT]

val relationship = (distance, spatialPredicate, isGeography, isRasterPredicate) match {
case (Some(_), SpatialPredicate.INTERSECTS, false, false) => "ST_Distance <="
case (Some(_), _, false, false) => "ST_Distance <"
case (Some(_), SpatialPredicate.INTERSECTS, true, false) => "ST_Distance (Geography) <="
case (Some(_), _, true, false) => "ST_Distance (Geography) <"
case (None, _, false, false) => s"ST_$spatialPredicate"
case (None, _, false, true) => s"RS_$spatialPredicate"
val relationship = (distance, spatialPredicate, isGeography, extraCondition, isRasterPredicate) match {
case (Some(_), SpatialPredicate.INTERSECTS, false, Some(ST_DWithin(Seq(_*))), false) => "ST_DWithin"
case (Some(_), SpatialPredicate.INTERSECTS, false, _, false) => "ST_Distance <="
case (Some(_), _, false, _, false) => "ST_Distance <"
case (Some(_), SpatialPredicate.INTERSECTS, true, Some(ST_DWithin(Seq(_*))), false) => "ST_DWithin(useSpheroid = true)"
case (Some(_), SpatialPredicate.INTERSECTS, true, _, false) => "ST_Distance (Geography) <="
case (Some(_), _, true, _, false) => "ST_Distance (Geography) <"
case (None, _, false, _, false) => s"ST_$spatialPredicate"
case (None, _, false,_, true) => s"RS_$spatialPredicate"
}
val (distanceOnIndexSide, distanceOnStreamSide) = distance.map { distanceExpr =>
matchDistanceExpressionToJoinSide(distanceExpr, left, right) match {
Expand Down
Loading
Loading