-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalysis_utils.py
190 lines (160 loc) · 6.34 KB
/
analysis_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import numpy as np
from tqdm import tqdm
from scipy.interpolate import griddata
from scipy.spatial import KDTree
from scipy.stats import iqr
class GriddataPartial:
def __init__(self, ground: np.ndarray, method: str = 'linear') -> None:
'''
This class is a wrapper around `scipy.interpolate.griddata` to facilitate
parallelization with multiprocessing.Pool for computing ground elevation of
non-ground points.
Parameters
----------
ground : np.ndarray
The ground points in the point cloud.
method : str, optional
The method to pass to `griddata`. One of 'linear', 'cubic', or 'nearest'.
Default is 'linear'.
'''
self.ground = ground
self.method = method
def __call__(self, xi: np.ndarray) -> np.ndarray:
'''
Interpolate the z coordinate of the ground values at the points in `xi`.
Parameters
----------
xi : np.ndarray
The points on which to evaluate the interpolation. I.e. the non-ground points.
Returns
-------
np.ndarray
The array of interpolated values.
'''
return griddata(points=self.ground[:, :2], values=self.ground[:, 2], xi=xi, method=self.method)
class HeightVariation:
def __init__(self, point_data_normalized_height: np.ndarray, kdtree: KDTree, r: float, method: str) -> None:
'''
This class computes the height variation of point data and facilitates
parallelization with multiprocessing.Pool. Height variation is the absolute difference
between min and max values of normalized height within a disk of radius r.
Parameters
----------
point_data_normalized_height : np.ndarray
The point cloud after height normalization.
kdtree : scipy.spatial.KDTree
A K-d tree build from the x- and y-coordinates of the point cloud data.
This facilitates fast ball point queries.
r : float
The radius of the ball point queries.
method : str
The height variation method.
'absolute': absolute difference between min and max values.
'mad': median absolute deviation. median(|z_i - median(z)|)
'iqr': interquartile range.
'''
self.point_data = point_data_normalized_height
self.kdtree = kdtree
self.r = r
if method == 'absolute':
self.method_func = self.absolute
elif method == 'mad':
self.method_func = self.mad
elif method == 'iqr':
self.method_func = iqr
else:
raise ValueError(f'Invalid method: \'{method}\'')
def __call__(self, index_range: tuple[int, int]) -> np.ndarray:
'''
Compute the height variation of the point data between indices i and j
for `index_range = (i, j)`.
Parameters
----------
index_range : tuple[int, int]
The indices defining the range of points in `self.point_data` on which to
compute height variation.
Returns
-------
np.ndarray
An array of the height variations of points between indices i and j.
'''
start, end = index_range
point_data_xy = self.point_data[start:end][:, :2]
hv = np.zeros(len(point_data_xy))
for i, xy in enumerate(point_data_xy):
indices = self.kdtree.query_ball_point(xy, r=self.r, p=2.0)
if len(indices) == 1:
continue
points_in_disk_z = self.point_data[indices][:, 2]
hv[i] = self.method_func(points_in_disk_z)
return hv
@staticmethod
def absolute(x):
return x.max() - x.min()
@staticmethod
def mad(x):
return np.median(np.abs(x - np.median(x)))
class NormalVariation:
def __init__(
self, point_data_xy: np.ndarray, normals: np.ndarray, kdtree: KDTree, r: float, use_tqdm: bool = False
) -> None:
'''
This class computes the normal variation of point data and facilitates
parallelization with multiprocessing.Pool. Normal variation is the negative
of the average dot product of each normal with other normals within a disk of
radius r. This value gives a measure of planarity near each point.
Parameters
----------
point_data_xy : np.ndarray
The- x and y-coordinates of the point clouddata.
normals : np.ndarray
An array of the (estimated) surface normal vectors at each point.
kdtree : scipy.spatial.KDTree
A K-d tree build from the x- and y-coordinates of the point cloud data.
This facilitates fast ball point queries.
r : float
The radius of the ball point queries.
use_tqdm : bool, optional
Whether or not to print progress using tqdm. Default is False.
'''
self.point_data_xy = point_data_xy
self.kdtree = kdtree
self.normals = normals
self.r = r
self.use_tqdm = use_tqdm
def __call__(self, index_range: tuple[int, int]) -> np.ndarray:
'''
Compute the normal variation of the point data between indices i and j
for `index_range = (i, j)`.
Parameters
----------
index_range : tuple[int, int]
The indices defining the range of points in `self.point_data` on which to
compute normal variation.
Returns
-------
np.ndarray
An array of the noraml variations of points between indices i and j.
'''
start, end = index_range
point_data_xy = self.point_data_xy[start:end]
nv = np.empty(len(point_data_xy))
for i, p in tqdm(enumerate(point_data_xy), total=len(point_data_xy), disable=not self.use_tqdm):
indices = self.kdtree.query_ball_point(p, r=self.r, p=2.0)
normal_dot_products = self.normals[indices] @ self.normals[start + i]
nv[i] = -normal_dot_products.mean()
return nv
def hag_pipeline(input_path, output_path):
'''
Defines the height above ground (hag) pipeline for height normalization with PDAl.
'''
hag_json = f'''{{
"pipeline": [
"{input_path}",
{{
"type": "filters.hag_delaunay"
}},
"{output_path}"
]
}}'''
return hag_json