-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfeasible_zones_primary.py
356 lines (315 loc) · 14.7 KB
/
feasible_zones_primary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
from typing import Optional
import geopandas as gpd
import pandas as pd
import pandera as pa
from pandarallel import pandarallel
from pandera import Check, Column, DataFrameSchema
from pandera.errors import SchemaErrors
from acbm.assigning.utils import (
_map_day_to_wkday_binary,
_map_time_to_day_part,
zones_to_time_matrix,
)
from acbm.config import Config
from acbm.logger_config import assigning_primary_feasible_logger as logger
pandarallel.initialize(progress_bar=True)
# --- Schemas for validation
activity_chains_schema = DataFrameSchema(
{
"mode": Column(str),
"TravDay": Column(pa.Float, Check.isin([1, 2, 3, 4, 5, 6, 7]), nullable=True),
"tst": Column(pa.Float, Check.less_than_or_equal_to(1440), nullable=True),
"TripTotalTime": Column(pa.Float, nullable=True),
# TODO: add more columns ...
},
strict=False,
)
activities_per_zone_schema = DataFrameSchema(
{
"counts": Column(pa.Int),
"floor_area": Column(pa.Float),
"activity": Column(str),
},
strict=False,
)
boundaries_schema = DataFrameSchema(
{
"geometry": Column("geometry"),
},
strict=False,
)
travel_times_schema = DataFrameSchema(
{
"mode": Column(str),
"weekday": Column(pa.Float, Check.isin([0, 1]), nullable=True),
# "time_of_day": Column(str, nullable=True),
"time": Column(float),
},
strict=False,
)
input_schemas = {
"activity_chains": activity_chains_schema,
"activities_per_zone": activities_per_zone_schema,
"boundaries": boundaries_schema,
"travel_times": travel_times_schema,
}
def get_possible_zones(
activity_chains: pd.DataFrame,
activities_per_zone: pd.DataFrame,
activity_col: str,
key_col: str,
boundaries: gpd.GeoDataFrame,
zone_id: str,
travel_times: Optional[pd.DataFrame] = None,
filter_by_activity: bool = False,
time_tolerance: float = 0.2,
) -> dict:
"""
Get possible zones for all activity chains in the dataset. This function loops over the travel_times dataframe and filters by mode, time of day and weekday/weekend.
At each loop it applies the _get_possible_zones function to each row in the activity_chains dataframe.
The travel_times dataframe is big, so doing some initial filtering before applying _get_possible_zones makes the process faster as less filtering is done for each
row when running _get_possible_zones.
Parameters
----------
activity_chains: pd.DataFrame
A dataframe with activity chains
travel_times: Optional[pd.DataFrame]
A dataframe with travel times between zones. If not provided, it will be created using zones_to_time_matrix.
activities_per_zone: pd.DataFrame
A dataframe with the number of activities and floorspace for each zone. The columns are 'OA21CD', 'counts', 'floor_area', 'activity'
where 'activity' is the activity type as defined in the osmox config file
key_col: str
The column in activity_chains that will be used as a key in the dictionary
boundaries: gpd.GeoDataFrame
A GeoDataFrame with the boundaries of the zones. Used to create the travel_times dataframe if not provided
zone_id: str
The column name of the zone id in the activity_chains dataframe
filter_by_activity: bool
If True, we will return a results that only includes destination zones that have an activity that matches the activity purpose
time_tolerance: int
The time tolerance is used to filter the travel_times dataframe to only include travel times within a certain range of the
activity chain's travel time (which is stored in "TripTotalTime"). Allowable travel_times are those that fall in the range of:
travel_time_reported * (1 - time_tolerance) <= travel_time_reported <= travel_time_reported * (1 + time_tolerance)
Default = 0.2
Returns
-------
dict
A dictionary of dictionaries. Each dictionary is for one of the rows in activity chains
with the origin zone as the key and a list of possible destination zones as the value. Eg:
{
164: {'E00059011': ['E00056917','E00056922', 'E00056923']},
165: {'E00059012': ['E00056918','E00056952', 'E00056923']}
}
"""
# Validate inputs lazily
try:
activity_chains = input_schemas["activity_chains"].validate(
activity_chains, lazy=True
)
activities_per_zone = input_schemas["activities_per_zone"].validate(
activities_per_zone, lazy=True
)
boundaries = input_schemas["boundaries"].validate(boundaries, lazy=True)
travel_times = input_schemas["travel_times"].validate(travel_times, lazy=True)
except SchemaErrors as e:
print("Validation failed with errors:")
print(e.failure_cases) # prints all the validation errors at once
return None
if travel_times is None:
logger.info("Travel time matrix not provided: Creating travel times estimates")
travel_times = zones_to_time_matrix(zones=boundaries, id_col=zone_id)
list_of_modes = activity_chains["mode"].unique()
print(f"Unique modes found in the dataset are: {list_of_modes}")
# use map_day_to_wkday_binary to identify if activity is on a weekday or weekend
activity_chains["weekday"] = activity_chains["TravDay"].apply(
_map_day_to_wkday_binary
)
# day types identifier (weekday/weekend)
day_types = activity_chains["weekday"].unique()
# use map_time_to_day_part to add a column to activity
activity_chains["time_of_day"] = activity_chains["tst"].apply(_map_time_to_day_part)
# get unique time_of_day values
list_of_times_of_day = activity_chains["time_of_day"].unique()
# Initialize a list to collect results
results_list = []
# loop over the list of modes
for mode in list_of_modes:
print(f"Processing mode: {mode}")
# filter the travel_times dataframe to only include rows with the current mode
travel_times_filtered_mode = travel_times[travel_times["mode"] == mode]
# if the mode is public transport, we need to filter the travel_times data based on time_of_day and weekday/weekend
# this only applies if we have the time_of_day column in the travel_times dataframe (not the case if we've estimated
# travel times)
if mode == "pt" and "time_of_day" in travel_times.columns:
for time_of_day in list_of_times_of_day:
print(f"Processing time of day: {time_of_day} | mode: {mode}")
for day_type in day_types:
print(
f"Processing time of day: {time_of_day} | weekday: {day_type} | mode: {mode}"
)
# filter the travel_times dataframe to only include rows with the current time_of_day and weekday
travel_times_filtered_mode_time_day = travel_times_filtered_mode[
(travel_times_filtered_mode["weekday"] == day_type)
& (travel_times_filtered_mode["time_of_day"] == time_of_day)
]
print(
"unique modes after filtering are",
travel_times_filtered_mode_time_day["mode"].unique(),
)
# filter the activity chains to the current mode, time_of_day and weekday
activity_chains_filtered = activity_chains[
(activity_chains["mode"] == mode)
& (activity_chains["time_of_day"] == time_of_day)
& (activity_chains["weekday"] == day_type)
]
if (
not travel_times_filtered_mode_time_day.empty
and not activity_chains_filtered.empty
):
# apply get_possible_zones to each row in activity_chains_filtered
# pandarallel.initialize(progress_bar=True)
possible_zones = activity_chains_filtered.parallel_apply(
lambda row, tt=travel_times_filtered_mode_time_day: {
row[key_col]: _get_possible_zones(
activity=row,
travel_times=tt,
activities_per_zone=activities_per_zone,
filter_by_activity=filter_by_activity,
activity_col=activity_col,
zone_id=zone_id,
time_tolerance=time_tolerance,
)
},
axis=1,
)
results_list.extend(possible_zones)
# for all other modes, we don't care about time of day and weekday/weekend
else:
travel_times_filtered_mode_time_day = travel_times_filtered_mode
activity_chains_filtered = activity_chains[
(activity_chains["mode"] == mode)
]
if (
not travel_times_filtered_mode_time_day.empty
and not activity_chains_filtered.empty
):
# apply _get_possible_zones to each row in activity_chains_filtered
# pandarallel.initialize(progress_bar=True)
possible_zones = activity_chains_filtered.parallel_apply(
lambda row, tt=travel_times_filtered_mode_time_day: {
row[key_col]: _get_possible_zones(
activity=row,
travel_times=tt,
activities_per_zone=activities_per_zone,
filter_by_activity=filter_by_activity,
activity_col=activity_col,
zone_id=zone_id,
time_tolerance=time_tolerance,
)
},
axis=1,
)
results_list.extend(possible_zones)
# Combine all dictionaries in the list into a single dictionary
results = {}
for result in results_list:
for key, value in result.items():
results[key] = value
return results
def _get_possible_zones(
activity: pd.Series,
travel_times: pd.DataFrame,
activities_per_zone: pd.DataFrame,
filter_by_activity: bool,
activity_col: str,
zone_id: str,
time_tolerance: float = 0.2,
) -> dict:
"""
Get possible zones for a given activity chain
Parameters
----------
activity: pd.Series
A row from the activity chains dataframe. It should contain the following columns: 'tst', 'TripTotalTime', 'mode', 'OA21CD'
travel_times: pd.DataFrame
A dataframe with travel times between zones
activities_per_zone: pd.DataFrame
A dataframe with the number of activities and floorspace for each zone. The columns are 'OA21CD', 'counts', 'floor_area', 'activity'
where 'activity' is the activity type as defined in the osmox config file
filter_by_activity: bool
If True, we will return a results that only includes destination zones that have an activity that matches the activity purpose
time_tolerance: int
The time tolerance is used to filter the travel_times dataframe to only include travel times within a certain range of the
activity chain's travel time (which is stored in "TripTotalTime"). Allowable travel_times are those that fall in the range of:
travel_time_reported * (1 - time_tolerance) <= travel_time_reported <= travel_time_reported * (1 + time_tolerance)
Default = 0.2
Returns
-------
dict
A dictionary with the origin zone as the key and a list of possible destination zones as the value
"""
# get the travel time
travel_time = activity["TripTotalTime"]
# get the origin zone
origin_zone = activity[zone_id]
# get the activity purpose
activity_purpose = activity[activity_col]
# filter the travel_times dataframe by trip_origin and activity_purpose
travel_times_filtered_origin_mode = travel_times[
travel_times[Config.origin_zone_id(zone_id)] == origin_zone
]
# do we include only zones that have an activity that matches the activity purpose?
if filter_by_activity:
filtered_activities_per_zone = activities_per_zone[
# activities_per_zone["activity"].str.split("_").str[0] == activity_purpose
activities_per_zone["activity"] == activity_purpose
]
logger.debug(
f"Activity {activity.id}: Number of zones with activity {activity_purpose}: \
{len(filtered_activities_per_zone)}"
)
# keep only the zones that have the activity purpose
travel_times_filtered_origin_mode = travel_times_filtered_origin_mode[
travel_times_filtered_origin_mode[Config.destination_zone_id(zone_id)].isin(
filtered_activities_per_zone[zone_id]
)
]
# how many zones are reachable?
logger.debug(
f"Activity {activity.id}: Number of zones with activity {activity_purpose} \
that are reachable using reported mode: {len(travel_times_filtered_origin_mode)}"
)
# filter by reported trip time
travel_times_filtered_time = travel_times_filtered_origin_mode[
(
travel_times_filtered_origin_mode["time"]
>= travel_time - time_tolerance * travel_time
)
& (
travel_times_filtered_origin_mode["time"]
<= travel_time + time_tolerance * travel_time
)
]
logger.debug(
f"Activity {activity.id}: Number of zones with activity {activity_purpose} within threshold of reported time {travel_time}: \
{len(travel_times_filtered_time)}"
)
# if travel_times_filtered_time returns an empty df, select the row with the closest time to the reported time
if travel_times_filtered_time.empty:
logger.debug(
f"Activity {activity.id}: NO zones match activity {activity_purpose} within threshold of reported time {travel_time}: \
Relaxing tolerance and getting matching zone that is closest to reported travel time"
)
travel_times_filtered_time = travel_times_filtered_origin_mode.iloc[
(travel_times_filtered_origin_mode["time"] - travel_time)
.abs()
.argsort()[:1]
]
# create dictionary with key = origin_zone and values = list of travel_times_filtered.OA21CD_to
return (
travel_times_filtered_time.groupby(Config.origin_zone_id(zone_id))[
Config.destination_zone_id(zone_id)
]
.apply(list)
.to_dict()
)