-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfuncs.py
215 lines (180 loc) · 9.72 KB
/
funcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import typing
import numpy as np
import pipe
def reshaper(values: typing.List[np.ndarray],
reshaping: typing.Optional[typing.Dict[int, tuple]] = None):
if reshaping is not None:
for reshaping_index, reshaping_shape in reshaping.items():
values[reshaping_index] = values[reshaping_index].reshape(reshaping_shape)
return values
def post_aggregation_take_col_and_where(col_idx: int, divider: float, left, right) -> pipe.AggregationFuncPair:
return pipe.AggregationFuncPair(
aggregation_func=lambda arr, divider_val: np.where(arr[..., col_idx] < divider_val, left, right),
aggregation_func_arg_func=lambda arr: pipe.ArgFuncOutput(args=(arr[0], divider))
)
def flattener(values: typing.Iterable[pipe.StepInput]) -> pipe.ArgFuncOutput:
# here we need to flat any input.
# new_X = []
# for single_step_input in values:
# if len(single_step_input.X) == 1:
# new_X.append(single_step_input.X)
# else:
# # we do not force this conversion. If the item cannot be made 1d
# # we raise an Exception, since the purpose of this is
# # to flat arrays such as [[1], [2], ...]. If we can't, we signal.
# if single_step_input.X.shape[0] != 1:
# raise ValueError(f'Cannot coerce to a 1d array, shape: {single_step_input.X.shape}' )
# new_X.append(single_step_input.X.reshape(-1, 1))
## if all_elements_as_one_args:
## return ArgFuncOutput(args=(new_X, ))
## else:
## return ArgFuncOutput(args=tuple(new_X))
# return ArgFuncOutput(args=(new_X,)
return pipe.ArgFuncOutput(args=([s.X for s in values],))
def StepAggregateWeightedSum(name: str, to_aggregate: typing.List[int],
weights: typing.List[float],
reshaping: typing.Optional[typing.Dict[int, tuple]] = None,
output_col_names_pre: typing.Optional[typing.List[str]] = None,
output_col_names_post: typing.Optional[typing.List[str]] = None,
post_aggregation: typing.Optional[pipe.AggregationFuncPair] = None) -> pipe.Step:
"""
TO BE DONE AFTER FLATTENING.
:param post_aggregation:
:param output_col_names_post:
:param output_col_names_pre:
:param name:
:param to_aggregate:
:param weights:
:param reshaping:
:return:
"""
if to_aggregate is None or weights is None:
raise ValueError('\'to_aggregated\' and \'weights\' cannot be None')
def step_func(values: typing.List[np.ndarray]) -> np.ndarray:
values = reshaper(reshaping=reshaping, values=values)
return np.sum(np.hstack(values) * weights, axis=1)
return pipe.Step(name=name, step=step_func, steps_to_aggregate=to_aggregate, arg_func=flattener,
output_col_names_pre=output_col_names_pre,
output_col_names_post=output_col_names_post, post_aggregation=post_aggregation)
def StepAggregateWeightedAverage(name: str, to_aggregate: typing.List[int],
weights: typing.Optional[typing.List[float]],
reshaping: typing.Optional[typing.Dict[int, tuple]] = None,
output_col_names_pre: typing.Optional[typing.List[str]] = None,
output_col_names_post: typing.Optional[typing.List[str]] = None,
post_aggregation: typing.Optional[pipe.AggregationFuncPair] = None):
"""
TO BE DONE AFTER FLATTENING.
:param post_aggregation:
:param output_col_names_post:
:param output_col_names_pre:
:param name:
:param to_aggregate:
:param weights:
:param reshaping:
:return:
"""
if to_aggregate is None:
raise ValueError('\'to_aggregated\' cannot be None')
def step_func(values: typing.List[np.ndarray]):
values = reshaper(reshaping=reshaping, values=values)
return np.average(np.vstack(values), weights=weights, axis=1)
return pipe.Step(name=name, step=step_func, steps_to_aggregate=to_aggregate, arg_func=flattener,
output_col_names_post=output_col_names_post,
output_col_names_pre=output_col_names_pre, post_aggregation=post_aggregation)
def StepAggregateMax(name: str, to_aggregate: typing.List[int],
reshaping: typing.Optional[typing.Dict[int, tuple]] = None,
output_col_names_pre: typing.Optional[typing.List[str]] = None,
output_col_names_post: typing.Optional[typing.List[str]] = None,
post_aggregation: typing.Optional[pipe.AggregationFuncPair] = None):
"""
TO BE DONE AFTER FLATTENING.
:param post_aggregation:
:param output_col_names_post:
:param output_col_names_pre:
:param name:
:param to_aggregate:
:param reshaping:
:return:
"""
if to_aggregate is None:
raise ValueError('\'to_aggregated\' cannot be None')
def step_func(values: typing.List[np.ndarray]):
values = reshaper(reshaping=reshaping, values=values)
return np.max(np.vstack(values), axis=1)
return pipe.Step(name=name, step=step_func, steps_to_aggregate=to_aggregate, arg_func=flattener,
output_col_names_pre=output_col_names_pre,
output_col_names_post=output_col_names_post, post_aggregation=post_aggregation)
def StepAggregateCount(name: str, to_aggregate: typing.List[int],
reshaping: typing.Optional[typing.Dict[int, tuple]] = None,
output_col_names_pre: typing.Optional[typing.List[str]] = None,
output_col_names_post: typing.Optional[typing.List[str]] = None,
post_aggregation: typing.Optional[pipe.AggregationFuncPair] = None):
if to_aggregate is None:
raise ValueError('\'to_aggregated\' cannot be None')
def step_func(values: typing.List[np.ndarray]):
values = reshaper(reshaping=reshaping, values=values)
# print(f'{[x.shape for x in values]} -> {np.count_nonzero(np.vstack(values), axis=1).shape}')
return np.count_nonzero(np.vstack(values), axis=1)
# this basically takes as input an array (even if we specify a list, it always receives as input one array)
# resulting from the aggregation of some IoPs. For instance, assuming to have two IoPs already aggregated,
# values[0] will be (len(dataset), 2), where the first column contains the data of the first IoP
# while the second on ethe second IoP.
# It will return as output an array of shape (len(dataset),).
return pipe.Step(name=name, step=step_func, steps_to_aggregate=to_aggregate, arg_func=flattener,
output_col_names_pre=output_col_names_pre,
output_col_names_post=output_col_names_post, post_aggregation=post_aggregation)
def StepAggregateFlattener(name: str, to_aggregate: typing.List[int],
expansion_mapper: typing.Optional[typing.Dict[int, int]] = None,
output_col_names_pre: typing.Optional[typing.List[str]] = None,
post_aggregation: typing.Optional[pipe.AggregationFuncPair] = None
) -> pipe.Step:
"""
aggregating IoPs which are by themselves multidimensional. *OR we make IoPs 1d* or we consider
something, eg., use a post-aggregator for each IoP. Should not be be too difficult if we
inherit from a common class.
:param post_aggregation:
:param output_col_names_pre:
:param name:
:param to_aggregate:
:param expansion_mapper:
:return:
"""
# expansion_mapper specifies if and how to expand (i.e., repeat) some data
if to_aggregate is None:
raise ValueError('\'to_aggregated\' cannot be None')
def step_func(values: typing.List[np.ndarray]):
# note that here we are receiving as input
# a list of multidimensional arrays.
# The length of the list corresponds to the number of steps to aggregate
# while each element is the vstack'd version of all the outputs of such steps.
# first, we perform repetition any time it is necessary.
# for i, single_value in enumerate(values):
# if expansion_mapper is not None and
if expansion_mapper is not None:
for row_index, expand_number in expansion_mapper.items():
old_value = values[row_index]
# we expand and then reshape so that the number of rows is the same e.g., from
# [[5, 10, 5, 10],
# [10, 5, 10, 5]]
# becomes
# array([[ 5, 5, 10, 10, 5, 5, 10, 10],
# [10, 10, 5, 5, 10, 10, 5, 5],
new_value = np.repeat(old_value, expand_number).reshape(old_value.shape[0], -1)
values[row_index] = new_value
new_X = []
# we might work with 1d array. They need to be reshaped
# to ensure that we are working with arrays where each data point
# has at least one feature.
for i, single_values in enumerate(values):
if len(single_values.shape) == 1:
new_X.append(single_values.reshape(-1, 1))
else:
new_X.append(single_values)
# now, the last step is straightforward, just hstack.
result = np.hstack(new_X)
return result
# def arg_func_creator(values: typing.List[StepInput]) -> ArgFuncOutput:
# result = [v.X for v in values]
# return ArgFuncOutput(args=(result,))
return pipe.Step(name=name, step=step_func, steps_to_aggregate=to_aggregate, arg_func=flattener,
output_col_names_pre=output_col_names_pre, post_aggregation=post_aggregation)