Skip to content

Commit

Permalink
Implement geom_equals and binary predicates that depend only on it. (
Browse files Browse the repository at this point in the history
…#926)

This PR implements binary predicates that depend only on equality, which is implemented here using columnar comparison in python.

I'm playing with benchmarks of this feature now. On only Point geometries, we begin to outperform geopandas at 50k points, with 60x performance at 10m points.

Authors:
  - H. Thomson Comer (https://github.com/thomcom)

Approvers:
  - Michael Wang (https://github.com/isVoid)

URL: #926
  • Loading branch information
thomcom authored Mar 23, 2023
1 parent 515e0c4 commit 04c941a
Show file tree
Hide file tree
Showing 4 changed files with 1,308 additions and 24 deletions.
309 changes: 301 additions & 8 deletions python/cuspatial/cuspatial/core/binpreds/binpreds.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,40 @@

import cudf

import cuspatial
from cuspatial.core._column.geocolumn import GeoColumn
from cuspatial.core.binpreds.contains import contains_properly
from cuspatial.utils.column_utils import (
contains_only_linestrings,
contains_only_multipoints,
contains_only_points,
contains_only_polygons,
has_multipolygons,
has_same_geometry,
)


class PreprocessorOutput:
"""The output of the preprocess method of a binary predicate.
This makes it possible to create a class that matches the necessary
signature of a geoseries.GeoColumnAccessor object. In some cases the
preprocessor may need to reorder the input data, in which case the
preprocessor will return a PreprocessorOutput object instead of a
GeoColumnAccessor."""

def __init__(self, coords, indices) -> None:
self.vertices = coords
self.indices = indices

@property
def xy(self):
return self.vertices

def point_indices(self):
return self.indices


class BinaryPredicate(ABC):
"""BinaryPredicate is an abstract class that implements the binary
predicate algorithm. The binary predicate algorithm is used to compute
Expand Down Expand Up @@ -58,10 +81,12 @@ def preprocess(self, lhs, rhs):
The right-hand-side of the internal binary predicate, may be
reordered.
"""
pass
return (lhs, rhs, cudf.RangeIndex(len(rhs)))

@abstractmethod
def postprocess(self, point_indices, point_result):
def postprocess(
self, point_indices: cudf.Series, point_result: cudf.Series
) -> cudf.Series:
"""Postprocess the output data for the binary predicate. This method
should be implemented by subclasses.
Expand Down Expand Up @@ -134,6 +159,15 @@ def __init__(self, lhs, rhs, align=True):
(self.lhs, self.rhs) = lhs.align(rhs) if align else (lhs, rhs)
self.align = align

def _cancel_op(self, lhs, rhs, result):
"""Used to disable computation of the binary predicate.
This occurs when the binary predicate is not supported for the
input types, and a final result can be computed only using
`preprocess` and `postprocess`."""
self._op = lambda x, y: result
return (lhs, rhs, result)

def __call__(self) -> cudf.Series:
"""Return the result of the binary predicate."""
# Type disambiguation
Expand Down Expand Up @@ -178,9 +212,9 @@ def preprocess(self, lhs, rhs):
# Arrange into shape for calling point-in-polygon, intersection, or
# equals
point_indices = geom.point_indices()
from cuspatial.core.geoseries import GeoSeries

final_rhs = GeoSeries(GeoColumn._from_points_xy(xy_points._column))
final_rhs = cuspatial.core.geoseries.GeoSeries(
GeoColumn._from_points_xy(xy_points._column)
)
return (lhs, final_rhs, cudf.Series(point_indices))

def _should_use_quadtree(self):
Expand Down Expand Up @@ -391,14 +425,16 @@ def _postprocess_brute_force_result(self, point_indices, point_result):

# Result can be:
# A Dataframe of booleans with n_points rows and up to 31 columns.
# Discrete math recombination
if (
contains_only_linestrings(self.rhs)
or contains_only_polygons(self.rhs)
or contains_only_multipoints(self.rhs)
):
# process for completed linestrings, polygons, and multipoints.
# Not necessary for points.
# Compute the set of results for each point-in-polygon predicate.
# Group them by the original index, and sum the results. If the
# sum of points in the rhs feature is equal to the number of
# points found in the polygon, then the polygon contains the
# feature.
point_result["idx"] = point_indices
group_result = (
point_result.groupby("idx").sum().sort_index()
Expand Down Expand Up @@ -487,3 +523,260 @@ def preprocess(self, lhs, rhs):
if contains_only_polygons(rhs):
(self.lhs, self.rhs) = (rhs, lhs)
return super().preprocess(rhs, lhs)

def postprocess(self, point_indices, point_result):
"""Postprocess the output GeoSeries to ensure that they are of the
correct type for the predicate."""
(hits, expected_count,) = self._count_results_in_multipoint_geometries(
point_indices, point_result
)
result_df = hits.reset_index().merge(
expected_count.reset_index(), on="rhs_index"
)
result_df["feature_in_polygon"] = (
result_df["point_index_x"] >= result_df["point_index_y"]
)
final_result = cudf.Series(
[False] * (point_indices.max().item() + 1)
) # point_indices is zero index
final_result.loc[
result_df["rhs_index"][result_df["feature_in_polygon"]]
] = True
return final_result


class EqualsBinpred(BinaryPredicate):
def _offset_equals(self, lhs, rhs):
"""Compute the pairwise length equality of two offset arrays"""
lhs_lengths = lhs[:-1] - lhs[1:]
rhs_lengths = rhs[:-1] - rhs[1:]
return lhs_lengths == rhs_lengths

def _sort_interleaved_points_by_offset(self, coords, offsets, sort_order):
"""Sort xy according to bins defined by offset. Sort order is a list
of column names to sort by.
`_sort_interleaved_points_by_offset` creates a dataframe with the
following columns:
"sizes": an index for each object represented in `coords`.
"points": an index for each point in `coords`.
"xy_key": an index that maintains x/y ordering.
"xy": the x/y coordinates in `coords`.
The dataframe is sorted according to keys passed in by the caller.
For sorting multipoints, the keys in order are "object_key", "xy",
"xy_key". This sorts the points in each multipoint into the same
bin defined by "object_key", then sorts the points in each bin by
x/y coordinates, and finally sorts the points in each bin by the
`xy_key` which maintains that the x coordinate precedes the y
coordinate.
For sorting linestrings, the keys in order are "object_key",
"point_key", "xy_key". This sorts the points in each linestring
into the same bin defined by "object_key", then sorts the points
in each bin by point ordering, and finally sorts the points in
each bin by x/y ordering.
Parameters
----------
coords : cudf.Series
interleaved x,y coordinates
offsets : cudf.Series
offsets into coords
sort_order : list
list of column names to sort by. One of "object_key", "point_key",
"xy_key", and "xy".
Returns
-------
cudf.Series
sorted interleaved x,y coordinates
"""
sizes = offsets[1:] - offsets[:-1]
object_key = (
cudf.Series(cp.arange(len(sizes)))
.repeat(sizes * 2)
.reset_index(drop=True)
)
point_key = cp.arange(len(coords) // 2).repeat(2)[::-1]
xy_key = cp.tile([0, 1], len(coords) // 2)
sorting_df = cudf.DataFrame(
{
"object_key": object_key,
"point_key": point_key,
"xy_key": xy_key,
"xy": coords,
}
)
sorted_df = sorting_df.sort_values(by=sort_order).reset_index(
drop=True
)
return sorted_df["xy"]

def _sort_multipoint_series(self, coords, offsets):
"""Sort xy according to bins defined by offset"""
result = self._sort_interleaved_points_by_offset(
coords, offsets, ["object_key", "xy", "xy_key"]
)
result.name = None
return result

def _sort_multipoints(self, lhs, rhs, initial):
lhs_sorted = self._sort_multipoint_series(
lhs.multipoints.xy, lhs.multipoints.geometry_offset
)
rhs_sorted = self._sort_multipoint_series(
rhs.multipoints.xy, rhs.multipoints.geometry_offset
)
lhs_result = cuspatial.core.geoseries.GeoSeries.from_multipoints_xy(
lhs_sorted, lhs.multipoints.geometry_offset
)
rhs_result = cuspatial.core.geoseries.GeoSeries.from_multipoints_xy(
rhs_sorted, rhs.multipoints.geometry_offset
)
lhs_result.index = lhs.index
rhs_result.index = rhs.index
return (
lhs_result,
rhs_result,
initial,
)

def _reverse_linestrings(self, coords, offsets):
"""Reverse the order of coordinates in a Arrow buffer of coordinates
and offsets."""
result = self._sort_interleaved_points_by_offset(
coords, offsets, ["object_key", "point_key", "xy_key"]
)
result.name = None
return result

def _compare_linestrings_and_reversed(self, lhs, rhs, initial):
"""Compare linestrings with their reversed counterparts."""
lhs_xy = self._reverse_linestrings(lhs.xy, lhs.part_offset)
rhs_xy = self._reverse_linestrings(rhs.xy, rhs.part_offset)
return (
PreprocessorOutput(lhs_xy, lhs.point_indices()),
PreprocessorOutput(rhs_xy, rhs.point_indices()),
initial,
)

def preprocess(self, lhs, rhs):
# Compare types
type_compare = lhs.feature_types == rhs.feature_types
# Any unmatched type is not equal
if (type_compare == False).all(): # noqa: E712
# Override _op so that it will not be run.
return self._cancel_op(lhs, rhs, type_compare)
# Get indices of matching types
if contains_only_multipoints(lhs):
lengths_equal = self._offset_equals(
lhs.multipoints.geometry_offset,
rhs.multipoints.geometry_offset,
)
if lengths_equal.any():
# Multipoints are equal if they contains the
# same unordered points.
return self._sort_multipoints(
lhs[lengths_equal],
rhs[lengths_equal],
lengths_equal,
)
else:
# No lengths are equal, so none can be equal.
return self._cancel_op(lhs, rhs, lengths_equal)
elif contains_only_linestrings(lhs):
lengths_equal = self._offset_equals(
lhs.lines.part_offset, rhs.lines.part_offset
)
if lengths_equal.any():
return (
lhs[lengths_equal],
rhs[lengths_equal],
lengths_equal,
)
else:
return self._cancel_op(lhs, rhs, lengths_equal)
elif contains_only_polygons(lhs):
raise NotImplementedError
elif contains_only_points(lhs):
return (lhs, rhs, type_compare)

def postprocess(self, lengths_equal, point_result):
# if point_result is not a Series, preprocessing terminated
# the results early.
if isinstance(point_result, cudf.Series):
point_result = point_result.sort_index()
lengths_equal[point_result.index] = point_result
return cudf.Series(lengths_equal)

def _vertices_equals(self, lhs, rhs):
"""Compute the equals relationship between interleaved xy
coordinate buffers."""
length = min(len(lhs), len(rhs))
a = lhs[:length:2]._column == rhs[:length:2]._column
b = rhs[1:length:2]._column == lhs[1:length:2]._column
return a & b

def _op(self, lhs, rhs):
if contains_only_linestrings(lhs):
# Linestrings can be compared either forward or
# reversed. We need to compare both directions.
lhs_reversed = self._reverse_linestrings(
lhs.lines.xy, lhs.lines.part_offset
)
forward_result = self._vertices_equals(lhs.lines.xy, rhs.lines.xy)
reverse_result = self._vertices_equals(lhs_reversed, rhs.lines.xy)
result = forward_result | reverse_result
elif contains_only_multipoints(lhs):
result = self._vertices_equals(
lhs.multipoints.xy, rhs.multipoints.xy
)
elif contains_only_points(lhs):
result = self._vertices_equals(lhs.points.xy, rhs.points.xy)
elif contains_only_polygons(lhs):
raise NotImplementedError
indices = lhs.point_indices
result_df = cudf.DataFrame(
{"idx": indices[: len(result)], "equals": result}
)
gb_idx = result_df.groupby("idx")
result = (gb_idx.sum().sort_index() == gb_idx.count().sort_index())[
"equals"
]
result.index = lhs.index
result.index.name = None
result.name = None
return result


class CrossesBinpred(EqualsBinpred):
"""An object is said to cross other if its interior intersects the
interior of the other but does not contain it, and the dimension of
the intersection is less than the dimension of the one or the other.
This is only implemented for `points.crosses(points)` at this time.
"""

def postprocess(self, point_indices, point_result):
if has_same_geometry(self.lhs, self.rhs) and contains_only_points(
self.lhs
):
return cudf.Series([False] * len(self.lhs))
df_result = cudf.DataFrame({"idx": point_indices, "pip": point_result})
point_result = cudf.Series(
df_result["pip"], index=cudf.RangeIndex(0, len(df_result))
)
point_result.name = None
return point_result


class CoversBinpred(EqualsBinpred):
"""An object A covers another B if no points of B lie in the exterior
of A, and at least one point of the interior of B lies in the interior
of A.
This is only implemented for `points.covers(points)` at this time."""

def postprocess(self, point_indices, point_result):
return cudf.Series(point_result, index=point_indices)
Loading

0 comments on commit 04c941a

Please sign in to comment.