-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess_data.py
2622 lines (2262 loc) · 141 KB
/
process_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os, re, csv, json, xmltodict
import random
from collections import OrderedDict, Counter, defaultdict
from util.util import to_dict, value_or_blank, unique_values, zone_name, is_a_lot, \
lot_code, is_virtual, get_terminals, is_timezoneless, write_or_append_to_csv, \
pull_from_url, remove_field, round_to_cent, corrected_zone_name, lot_list, \
other_zones_list, numbered_reporting_zones_list, sampling_groups, \
add_element_to_set_string, add_if_new, group_by_code, numbered_zone, censor, \
build_keys
from fetch_terminals import pull_terminals
import requests
import zipfile
from io import BytesIO # Works only under Python 3
from copy import copy
import time, pytz
from pprint import pprint
from datetime import datetime, timedelta
from dateutil import parser
#from util.db_util import create_or_connect_to_db, get_tables_from_db, get_ps_for_day as db_get_ps_for_day
from util.sqlite_util import get_events_from_sqlite, bulk_upsert_to_sqlite, bulk_upsert_to_sqlite_local, time_to_field, mark_date_as_cached, is_date_cached, mark_utc_date_as_cached, is_utc_date_cached
from notify import send_to_slack
import config # To define a file-crossing global like global_terminal_ids_without_groups.
#from util.carto_util import update_map
from parameters.credentials_file import CALE_API_user, CALE_API_password
from parameters.local_parameters import path, SETTINGS_FILE
from parameters.remote_parameters import BASE_URL
from pipe.pipe_to_CKAN_resource import send_data_to_pipeline, get_connection_parameters, TransactionsSchema, SplitTransactionsSchema, SamplingTransactionsSchema, SplitSamplingTransactionsSchema, OccupancySchema
from pipe.gadgets import get_resource_data
from nonchalance import add_hashes
DEFAULT_TIMECHUNK = timedelta(minutes=10)
last_date_cache = None
all_day_ps_cache = []
dts_cache = []
last_utc_date_cache = None
utc_ps_cache = []
utc_dts_cache = []
global_warnings = defaultdict(int)
temp_zone_info = {'344 - 18th & Carson Lot': {'Latitude': 40.428484093957401,
'Longitude': -79.98027965426445},
'345 - 20th & Sidney Lot': {'Latitude': 40.429380412222464,
'Longitude': -79.980572015047073},
'343 - 19th & Carson Lot': {'Latitude': 40.428526970691195,
'Longitude': -79.978395402431488},
'345 - 20th & Sidney Lot': {'Latitude': 40.429216054112679,
'Longitude': -79.977073073387146},
'338 - 42nd & Butler Lot': {'Latitude': 40.47053200000002,
'Longitude': -79.960346247850453},
'337 - 52nd & Butler Lot': {'Latitude': 40.481067498214522,
'Longitude': -79.953901635581985},
'311 - Ansley Beatty Lot': {'Latitude': 40.463049472104458,
'Longitude': -79.926414303372439},
'355 - Asteroid Warrington Lot': {'Latitude': 40.421746663239325,
'Longitude': -79.993341658895474},
'425 - Bakery Sq': {'Latitude': 40.4560281126722,
'Longitude': -79.916535012428085},
'321 - Beacon Bartlett Lot': {'Latitude': 40.435453694403037,
'Longitude': -79.923617310019822},
'363 - Beechview Lot': {'Latitude': 40.411083915458534,
'Longitude': -80.024386919130848},
'418 - Beechview': {'Latitude': 40.409913479391079,
'Longitude': -80.024733782184967},
'406 - Bloomfield (On-street)': {'Latitude': 40.461946760727805,
'Longitude': -79.946826139799441},
'361 - Brookline Lot': {'Latitude': 40.392674122243058,
'Longitude': -80.018725208992691},
'419 - Brookline': {'Latitude': 40.393688357340416,
'Longitude': -80.019989138111754},
'351 - Brownsville & Sandkey Lot': {'Latitude': 40.384849483758344,
'Longitude': -79.977419455740346},
'416 - Carrick': {'Latitude': 40.386373443728381,
'Longitude': -79.97945490478287},
'329 - Centre Craig': {'Latitude': 40.45168996155256,
'Longitude': -79.95195418596267},
'323 - Douglas Phillips Lot': {'Latitude': 40.432617056862256,
'Longitude': -79.922537281579963},
'401 - Downtown 1': {'Latitude': 40.441775562513982,
'Longitude': -79.998573266419925},
'402 - Downtown 2': {'Latitude': 40.438541198850679,
'Longitude': -80.001387482255666},
'342 - East Carson Lot': {'Latitude': 40.42911498849881,
'Longitude': -79.98570442199707},
'412 - East Liberty': {'Latitude': 40.460954767837613,
'Longitude': -79.926159897229695},
'371 - East Ohio Street Lot': {'Latitude': 40.454243200345864,
'Longitude': -79.999740015542329},
'307 - Eva Beatty Lot': {'Latitude': 40.461651797420089,
'Longitude': -79.927785198164941},
'324 - Forbes Murray Lot': {'Latitude': 40.438609122362699,
'Longitude': -79.922507232308064},
'322 - Forbes Shady Lot': {'Latitude': 40.438602290037359,
'Longitude': -79.920121894069666},
'335 - Friendship Cedarville Lot': {'Latitude': 40.462314291429955,
'Longitude': -79.948193852761278},
'331 - Homewood Zenith Lot': {'Latitude': 40.455562043993496,
'Longitude': -79.89687910306202},
'328 - Ivy Bellefonte Lot': {'Latitude': 40.45181388509701,
'Longitude': -79.933232609325415},
'325 - JCC/Forbes Lot': {'Latitude': 40.437756155476606,
'Longitude': -79.923901042327884},
'405 - Lawrenceville': {'Latitude': 40.467721251303139,
'Longitude': -79.963118098839757},
'369 - Main/Alexander Lot': {'Latitude': 40.440717969032434,
'Longitude': -80.03386820671949},
'414 - Mellon Park': {'Latitude': 40.45172469595348,
'Longitude': -79.919594841104498},
'420 - Mt. Washington': {'Latitude': 40.432932025800348,
'Longitude': -80.010913107390707},
'422 - Northshore': {'Latitude': 40.447064541266613,
'Longitude': -80.008874122734966},
'421 - NorthSide': {'Latitude': 40.454215096885378,
'Longitude': -80.008679951361657},
'407 - Oakland 1': {'Latitude': 40.440712434300536,
'Longitude': -79.962027559420548},
'408 - Oakland 2': {'Latitude': 40.443878246794903,
'Longitude': -79.956351936149389},
'409 - Oakland 3': {'Latitude': 40.447221532200416,
'Longitude': -79.951424734414488},
'410 - Oakland 4': {'Latitude': 40.441311089931347,
'Longitude': -79.94689005613327},
'375 - Oberservatory Hill Lot': {'Latitude': 40.490002153374341,
'Longitude': -80.018556118011475},
'314 - Penn Circle NW Lot': {'Latitude': 40.463423581089359,
'Longitude': -79.926107418017466},
'411 - Shadyside': {'Latitude': 40.455189648283827,
'Longitude': -79.935153703219399},
'301 - Sheridan Harvard Lot': {'Latitude': 40.462616226637564,
'Longitude': -79.923065044145574},
'302 - Sheridan Kirkwood Lot': {'Latitude': 40.46169199390453,
'Longitude': -79.922711968915323},
'357 - Shiloh Street Lot': {'Latitude': 40.429924701959528,
'Longitude': -80.007599227402991},
'415 - SS & SSW': {'Latitude': 40.428051479201962,
'Longitude': -79.975047048707509},
'413 - Squirrel Hill': {'Latitude': 40.433581368049765,
'Longitude': -79.92309870425791},
'404 - Strip Disctrict': {'Latitude': 40.45040837184569,
'Longitude': -79.985526114383774},
'304 - Tamello Beatty Lot': {'Latitude': 40.46097078534487,
'Longitude': -79.927121205522525},
'334 - Taylor Street Lot': {'Latitude': 40.463318543844693,
'Longitude': -79.950406186508189},
'403 - Uptown': {'Latitude': 40.439793439383763,
'Longitude': -79.984900553021831},
'354 - Walter/Warrington Lot': {'Latitude': 40.42172215989536,
'Longitude': -79.995026086156827},
'423 - West End': {'Latitude': 40.441325754999475,
'Longitude': -80.033656060668363}}
# 341 - 18th & Sidney is missing from this list.
def get_zone_info(server):
"""Gather useful parameters about each zone (or lot) into a zone_info dictionary."""
from parameters.remote_parameters import spot_counts_resource_id, lease_counts_resource_id
zone_info_cache_file = 'zone_info.csv'
try:
settings, site, package_id, API_key = get_connection_parameters(server, SETTINGS_FILE)
records = get_resource_data(site,spot_counts_resource_id,API_key=API_key,count=10000)
lease_rows = get_resource_data(site,lease_counts_resource_id,API_key=API_key,count=10000)
except:
print("Unable to download the zone/lot/lease information. Falling back to the cached file.")
with open(zone_info_cache_file) as zic:
list_of_ds = csv.DictReader(zic)
zone_info = {}
for d in list_of_ds:
zone = d['zone']
zone_info[zone] = d
del(zone_info[zone]['zone'])
return zone_info
zone_info = {}
leases = {}
for l in lease_rows:
leases[l['zone']] = l['active_leases']
for r in records:
zone = r['zone']
zone_info[zone] = {'spaces': r['spaces'], 'type': 'On street' if r['type'] == 'on-street' else 'Lot'}
if zone in leases.keys():
try:
zone_info[zone]['leases'] = int(leases[zone])
except:
pass
try:
zone_info[zone]['spaces'] = int(zone_info[zone]['spaces'])
# Subtract lease counts from the number of available spots in each parking lot in get_zone_info to provide a more accurate estimate of percent_occupied.
if 'leases' in zone_info[zone]:
zone_info[zone]['spaces'] -= zone_info[zone]['leases']
except:
pass
if zone in temp_zone_info.keys():
zone_info[zone]['latitude'] = temp_zone_info[zone]['Latitude']
zone_info[zone]['longitude'] = temp_zone_info[zone]['Longitude']
# Convert to a list-of-dicts structure for caching to a file:
list_of_ds = [{'zone': k, **v} for k,v in zone_info.items()]
keys = sorted(list_of_ds[0].keys())
# Now cache the resulting zone_info in case it can't be retrieved.
write_or_append_to_csv(zone_info_cache_file,list_of_ds,keys,actually_overwrite=True)
return zone_info
def round_time(dt=None, round_to=60, method="half up"):
"""Round a datetime object to any time laps[e] in seconds
dt : datetime.datetime object, default now.
roundTo : Closest number of seconds to round to, default 1 minute.
Author: Thierry Husson 2012 - Use it as you want but don't blame me.
Modified by drw 2018 with help from https://stackoverflow.com/a/32547090
"""
if dt == None : dt = datetime.now()
seconds = (dt.replace(tzinfo=None) - dt.min).seconds
if method == 'half up': # Round to the nearest value
# breaking ties by round 0.5 up to 1.
rounding = (seconds+round_to/2) // round_to * round_to
elif method == 'down':
rounding = seconds // round_to * round_to
else:
raise ValueError("round_time doesn't know how to round {}".format(method))
return dt + timedelta(0,rounding-seconds,-dt.microsecond)
def is_very_beginning_of_the_month(dt):
return dt.day == 1 and dt.hour == 0 and dt.minute == 0 and dt.second == 0 and dt.microsecond == 0
def beginning_of_day(dt=None):
"""Takes a datetime and returns the first datetime before
that that corresponds to LOCAL midnight (00:00).
This function is time-zone agnostic."""
# Using this function may have been part of the DST-related problems. Hence the
# creation of localized_beginning_of_day below.
if dt == None : dt = datetime.now()
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def localized_beginning_of_day(local_tz,dt=None):
"""Takes a datetime and returns the first datetime before
that that corresponds to LOCAL midnight (00:00).
This function is time-zone agnostic."""
# Note that this function was created as one attempt to fix the DST-related
# problems, but now that more times have been switched to using UTC time,
# it seems to be unneeded.
if dt == None : dt = datetime.now()
return (local_tz.normalize(dt)).replace(hour=0, minute=0, second=0, microsecond=0)
def terminal_of(p,t_guids,terminals):
t = terminals[t_guids.index(p['@TerminalGuid'])]
return t
def add_duration(p,raw_only=False):
if raw_only:
p['Duration'] = None # in minutes
else:
p['Duration'] = int(p['@Units']) # in minutes
return p
def fix_one_duration(p,session,raw_only=False):
add_duration(p)
try: # Sort transactions by EndDateLocal.
ps = sorted(session, key=lambda x: x['@EndDateUtc'])[::-1]
#pprint(ps)
except:
print("len(session) = {}".format(len(session)))
for e in session:
if '@EndDateUtc' not in e:
print("Missing '@EndDateUtc':")
pprint(to_dict(e))
raise ValueError("Found a transaction that is missing @EndDateUtc.")
raise ValueError("Unable to sort session transactions.")
# Find p in the sorted session list.
k = 0
while ps[k] != p:
k += 1
assert ps[k] == p
if len(ps) > 1 and k+1 != len(ps):
p['Duration'] -= int(ps[k+1]['@Units'])
if p['Duration'] < 0:
pprint(ps)
pprint(p)
raise ValueError('Negative duration encountered.')
elif 'Duration' not in p:
p['Duration'] = int(p['@Units'])
# Now that each purchase has an associated duration, calculate the true start of the corresponding
# parking segment (when the car has parked, not when it has paid).
p['parking_segment_start_utc'] = parking_segment_start_of(p)
# This is a costly operation, so really calculating Durations and finding the true pay interval bounds should be
# done when the data is first pulled and stored in the local cache.
p['segment_number'] = len(ps)-k-1
#print("Durations: {}, @Units: {}".format([e['Duration'] if 'Duration' in e else None for e in ps], [int(e['@Units']) for e in ps]))
def fix_durations(session,raw_only=False):
"""This function accepts a pre-grouped set of purchases that comprise
a single car's parking session. This is typically one to five transactions.
These purchases can be assigned new fields (like 'Duration') which are
computed by examining other purchases to infer the real parking time
associated with each transaction."""
try: # Sort transactions by EndDateUtc.
ps = sorted(session, key=lambda x: x['@EndDateUtc'])[::-1]
except:
print("len(session) = {}".format(len(session)))
for e in session:
if '@EndDateUtc' not in e:
print("Missing '@EndDateUtc':")
pprint(to_dict(e))
raise ValueError("Found a transaction that is missing @EndDateUtc.")
raise ValueError("Unable to sort session transactions.")
for p in ps:
if 'Duration' not in p:
add_duration(p,raw_only)
else:
print("p with pre-existing Duration field found in fix_durations.")
for k,p in enumerate(ps):
# Subtract the durations of the previous payments.
# If the minutes purchased are 10, 30, 5,
# the Units fields will have values 10, 40, and 45, when
# sorted in chronological order.
# Reversing this process, we start with the most recent
# transaction (45) and subtract the previous (40) to
# get the minutes purchased (5).
if k+1 != len(ps):
# Subtract off the cumulative minutes purchased by the most recent
# predecessor, so that the Duration field represents just the Duration
# of this transaction. (Duration is the incremental number of minutes
# purchased, while the '@Units' field is the CUMULATIVE number of
# minutes.)
p['Duration'] -= int(ps[k+1]['@Units'])
# Now that each purchase has an associated duration, calculate the true start of the corresponding
# parking segment (when the car has parked, not when it has paid).
if p['Duration'] < 0:
pprint(session)
pprint(p)
raise ValueError('Negative duration encountered.')
p['parking_segment_start_utc'] = parking_segment_start_of(p)
p['segment_number'] = len(ps)-k-1
def hash_reframe(p,terminals,t_guids,hash_history,previous_history,uncharted_n_zones,uncharted_e_zones,turbo_mode,raw_only,transactions_only,extend=True):
"""Take a dictionary and generate a new dictionary from it that samples
the appropriate keys and renames and transforms as desired.
In contrast with reframe, which used ps_dict with its uncertain linking between transactions,
hash_reframe is hashing unique identifiers to take the guesswork out of linking transactions
into sessions (at the cost of preventing past transactions from being linked)."""
row = {}
#row['GUID'] = p['@PurchaseGuid'] # To enable this field,
# get_batch_parking_for_day needs to be tweaked and
# JSON caches need to be regenerated.
try:
row['TerminalGUID'] = p['@TerminalGuid'] # This is useful
# for connecting purchases with terminals when the ID changes
# but the GUID does not change.
except:
print("p['@TerminalGuid'] is missing from {}".format(p))
row['TerminalID'] = p['@TerminalID']
if p['@TerminalGuid'] in t_guids:
t = terminals[t_guids.index(p['@TerminalGuid'])]
if extend:
row['Latitude'] = value_or_blank('Latitude',t)
row['Longitude'] = value_or_blank('Longitude',t)
# Maybe these should be value_or_none instead.
row['List_of_sampling_groups'] = sampling_groups(t,uncharted_n_zones,uncharted_e_zones)
row['Amount'] = float(p['@Amount'])
if not transactions_only:
if 'Duration' in p:
row['Duration'] = p['Duration']
else:
row['Duration'] = None
row['Is Mobile Payment'] = is_mobile_payment(p)
return row
def find_biggest_value(d_of_ds,field='transactions'):
return sorted(d_of_ds,key=lambda x:d_of_ds[x][field])[-1]
def update_occupancies(inferred_occupancy,stats_by_zone,slot_start,timechunk):
"""This function uses the parking durations inferred by trying to piece
together sessions from individual transactions to synthesize an
estimated count of parked cars for each zone and time chunk,
starting at slot_start and going forward.
No correction factors have been applied yet."""
delta_minutes = timechunk.total_seconds()/60.0
for zone in stats_by_zone:
#durations = json.loads(stats_by_zone[zone]['Durations']) # No longer necessary
# since this field is going to be a list of integers until package_for_output
# is called.
durations = stats_by_zone[zone]['Durations']
# if len(durations) > 0:
# print "\ndurations for zone {} = ".format(zone)
# pprint(durations)
for d_i in durations:
bins = int(round(float(d_i)/delta_minutes))
# Rounding like this means that for a timechunk of 10 minutes,
# 1-4 minute parking sessions will not add to inferred
# occupancy, while 5-14 minute sessions will add ten minutes
# of apparent occupancy. This will work perfectly if the
# timechunk is one minute (i.e., no occupancy will be lost
# due to rounding errors).
# [ ] What if instead of rounding, we use fractional cars?
# A car that is parked for 3 minutes out of a 10-minute
# slot is 0.3 cars. But for this to really work well, we'd
# need to know not just the durations for each slot, but
# the start time for each duration. Using rounded occupancies
# is good for generating occupancy estimates from
# aggregated statitistics.
# [ ] Compare this method to an exact transaction-by-transaction
# calculation of occupancy.
for k in range(0,bins):
inferred_occupancy[slot_start+k*timechunk][zone] += 1
# inferred_occupancy is measured in cars (or used parking spaces),
# though a more useful metric would be percent_occupied.
# if len(durations) > 0:
# print "inferred_occupancy for zone {} =".format(zone)
# for t in sorted(inferred_occupancy.keys()):
# print t, to_dict(inferred_occupancy[t])
return inferred_occupancy
def format_a_key(meter_id,year,month,hour):
# return "{}|{}/{} {:02d}".format(meter_guid,year,month,hour)
return "{}/{} {:02d}|{}".format(year,month,hour,meter_id)
def initialize_zone_stats(start_time,end_time,space_aggregate_by,time_aggregate_by,split_by_mode,tz=pytz.timezone('US/Eastern'), transactions_only=True):
stats = {}
# This is where it would be nice to maybe do some different formatting based on the
# time_aggregation parameter (since now a bin is not defined just by start and end
# but also by year-month. The other possibility would be to do it when the month
# is archived (from the loop in main()).
start_time_local = start_time.astimezone(tz)
stats['start'] = datetime.strftime(start_time_local,"%Y-%m-%d %H:%M:%S")
# [ ] Is this the correct start time?
end_time_local = end_time.astimezone(tz)
stats['end'] = datetime.strftime(end_time_local,"%Y-%m-%d %H:%M:%S")
start_time_utc = start_time.astimezone(pytz.utc)
stats['utc_start'] = datetime.strftime(start_time_utc,"%Y-%m-%d %H:%M:%S")
if not split_by_mode:
stats['transactions'] = 0
stats['Payments'] = 0.0
else:
stats['meter_transactions'] = 0
stats['Meter Payments'] = 0.0
stats['mobile_transactions'] = 0
stats['Mobile Payments'] = 0.0
if not transactions_only:
stats['car_minutes'] = 0
stats['Durations'] = [] # The Durations field represents the durations of the purchases
# made during this time slot. Just as transactions indicates how many times people
# put money into parking meters (or virtual meters via smartphone apps) and
# Payments tells you how much money was paid, Durations tells you the breakdown of
# parking minutes purchased. The sum of all the durations represented in the
# Durations list should equal the value in the car_minutes field. This field has been
# changed to a list data structure until package_for_output, at which point it is
# reformatted into a dictionary.
if space_aggregate_by == 'sampling zone':
stats['parent_zone'] = None
if time_aggregate_by == 'month':
stats['Year'] = start_time.astimezone(tz).strftime("%Y")
stats['Month'] = start_time.astimezone(tz).strftime("%m")
stats['Hour'] = start_time.astimezone(tz).strftime("%-H")
stats['UTC Hour'] = start_time.astimezone(pytz.utc).strftime("%-H")
return stats
def distill_stats(rps,terminals,t_guids,t_ids,group_lookup_addendum,start_time,end_time, stats_by={},zone_kind='old', space_aggregate_by='zone', time_aggregate_by=None, split_by_mode=False, parent_zones=[], tz=pytz.timezone('US/Eastern'), transactions_only=True):
# Originally this function just aggregated information
# between start_time and end_time to the zone level.
# Then it was modified to support sampling zones,
# allowing the function to be called separately just to
# get sampling-zone-level aggregation.
# THEN it was modified to also allow aggregation by
# meter ID instead of only by zone.
# If 'Duration' does not have a non-None value in any of the rps,
# distill_stats will not add Durations and car-minutes fields.
global global_warnings
for k,rp in enumerate(rps):
t_guid = rp['TerminalGUID']
t_id = rp['TerminalID']
zone = None
space_aggregation_keys = []
aggregation_keys = []
if space_aggregate_by == 'zone':
if t_guid in t_guids:
t = terminals[t_guids.index(t_guid)]
else:
t = None
if zone_kind == 'new':
zone, _, _ = numbered_zone(t_id, t, group_lookup_addendum)
elif t is not None:
if zone_kind == 'old':
zone = corrected_zone_name(t) # Changed
# from zone_name(t) to avoid getting
# transactions in "Z - Inactive/Removed Terminals".
else:
print("OH NO!!!!!!!!!!!\n THE TERMINAL FOR THIS PURCHASE CAN NOT BE FOUND\n BASED ON ITS GUID!!!!!!!!!!!!!!!")
if zone_kind == 'old':
zone = corrected_zone_name(None,t_ids,rp['TerminalID'])
if zone is not None: # zone can be None if a numbered zone cannot be identified, meaning that
# space_aggregation_keys will be empty, and the transaction will not be have a bin to
# be published in.
space_aggregation_keys = [zone]
elif space_aggregate_by == 'sampling zone':
if 'List_of_sampling_groups' in rp and rp['List_of_sampling_groups'] != []:
space_aggregation_keys = rp['List_of_sampling_groups']
# The problem with this is that a given purchase is associated with a terminal which may have MULTIPLE sampling zones. Therefore, each sampling zone must have its own parent zone(s).
elif space_aggregate_by == 'meter':
space_aggregation_keys = [t_id] # Should this be GUID or just ID? ... Let's
# make it GUID (as it will not change), but store meter ID as
# an additional field
# I've decided to switch to ID for nicer sorting, and because
# maybe it actually makes a little more sense to highlight the changes
# associated with an ID change. (Sometimes this is fixing a typo or
# a small change but it might be a larger change. In any event,
# the user would have both ID and GUID in this meter-month-hour
# aggregation mode.
if space_aggregation_keys != []:
space_aggregation_keys = censor(space_aggregation_keys,space_aggregate_by) # The censor function filters out
# forbidden zones, both for regular zones and sampling zones and requires that sampling zones be
# pre-approved (that is, in the designated_minizones list).
if time_aggregate_by is None:
aggregation_keys = space_aggregation_keys
elif time_aggregate_by == 'month':
time_key = start_time.astimezone(tz).strftime("%Y/%m %H") #+ "-"+ start_time.astimezone(pytz.utc).strftime("%H")
# [X] Actually, do we really need to distinguish between the two 1-2am slots on the one day of the year when
# the clocks shift back? Maybe we can just combine those together.
year = start_time.astimezone(tz).strftime("%Y")
month = start_time.astimezone(tz).strftime("%m")
hour = int(start_time.astimezone(tz).strftime("%H"))
#aggregation_keys = [s_a_k + "|" + time_key for s_a_k in space_aggregation_keys]
aggregation_keys = [format_a_key(s_a_k,year,month,hour) for s_a_k in space_aggregation_keys]
#time_key = start_time.astimezone(tz).strftime("%Y/%m %H") + "-"+ start_time.astimezone(pytz.utc).strftime("%H")
#aggregation_keys = [(s_a_k, time_key) for s_a_k in space_aggregation_keys if not_censored(s_a_k)]
# The above could really stand to be refactored.
if aggregation_keys != []:
for a_key in aggregation_keys:
zone = a_key.split('|')[0]
if a_key not in stats_by:
stats_by[a_key] = initialize_zone_stats(start_time,end_time,space_aggregate_by,time_aggregate_by,split_by_mode, tz=pytz.timezone('US/Eastern'), transactions_only=transactions_only)
stats_by[a_key]['zone'] = zone
if space_aggregate_by == 'sampling zone':
if 'parent_zone' in stats_by[a_key]:
#for zone in space_aggregation_keys:
# There are now cases where getting the zone from space_aggregation_keys
# for space_aggregate_by == 'sampling zone' results in multiple zones
# since the value comes from rp['List_of_sampling_groups']. Basically,
# a terminal group can be assigned to an arbitary number of Terminal
# Groups, and we are getting the ones that are not sampling zones,
# so one terminal can be both in 'CMU Study' and 'Marathon/CMU', for
# instance.
#
# It seems like the correct thing to do in this case is add the
# transactions to both sampling zones.
# This should actually happen naturally if the space part of the
# aggregation key could be pulled off and used as the zone in
# each case, which is what I've done.
# This output seems to be the same as before space-time aggregation
# was added.
if zone in parent_zones:
stats_by[a_key]['parent_zone'] = '|'.join(parent_zones[zone])
else:
msg = "sampling zone = {} is not listed in parent_zones, though this may just be because process_data.py is working off of a cached file.".format(zone)
print(msg)
global_warnings[msg] += 1
stats_by[a_key]['parent_zone'] = ''
elif space_aggregate_by == 'meter':
stats_by[a_key]['Meter GUID'] = t_guid
stats_by[a_key]['Meter ID'] = t_id
nz, _, _ = numbered_zone(t_id,None,group_lookup_addendum)
stats_by[a_key]['zone'] = nz
if not split_by_mode:
stats_by[a_key]['transactions'] += 1
stats_by[a_key]['Payments'] += rp['Amount']
else: # Split payments into mobile and meter payments
if rp['Is Mobile Payment']:
stats_by[a_key]['mobile_transactions'] += 1
stats_by[a_key]['Mobile Payments'] += rp['Amount']
else:
stats_by[a_key]['meter_transactions'] += 1
stats_by[a_key]['Meter Payments'] += rp['Amount']
if not transactions_only and 'Duration' in rp and rp['Duration'] is not None:
stats_by[a_key]['car_minutes'] += rp['Duration']
stats_by[a_key]['Durations'].append(rp['Duration'])
return stats_by
def build_url(base_url,slot_start,slot_end):
"""This function takes the bounding datetimes, checks that
they have time zones, and builds the appropriate URL,
converting the datetimes to UTC (which is what the CALE
API expects).
This function is called by get_batch_parking_for_day
(and was also used by get_recent_parking_events)."""
if is_timezoneless(slot_start) or is_timezoneless(slot_end):
raise ValueError("Whoa, whoa, whoa! One of those times is unzoned!")
# Since a slot_end that is too far in the future results
# in a 400 (reason = "Bad Request"), limit how far in
# the future slot_end may be
arbitrary_limit = datetime.now(pytz.utc) + timedelta(hours = 1)
if slot_end.astimezone(pytz.utc) > arbitrary_limit:
slot_end = arbitrary_limit
date_format = '%Y-%m-%d'
time_format = '%H%M%S'
url_parts = [slot_start.astimezone(pytz.utc).strftime(date_format),
slot_start.astimezone(pytz.utc).strftime(time_format),
slot_end.astimezone(pytz.utc).strftime(date_format),
slot_end.astimezone(pytz.utc).strftime(time_format)]
url = base_url + '/'.join(url_parts)
return url
def convert_doc_to_purchases(doc,slot_start,date_format):
if 'Purchases' not in doc:
print("Failed to retrieve records for UTC time {}".format(slot_start.astimezone(pytz.utc).strftime(date_format)))
# Possibly an exception should be thrown here.
return None
if doc['Purchases']['@Records'] == '0':
return []
ps = doc['Purchases']['Purchase']
if type(ps) == list:
#print "Found a list!"
return ps
if type(ps) == type(OrderedDict()):
#print "It's just one OrderedDict. Let's convert it to a list!"
return [ps]
print("Found something else of type {}".format(type(ps)))
return []
def cull_fields(ps):
"""Remove a bunch of unneeded fields."""
purchases = remove_field(ps,'@Code')
purchases = remove_field(purchases,'@ArticleID')
purchases = remove_field(purchases,'@ArticleName')
purchases = remove_field(purchases,'@CurrencyCode')
purchases = remove_field(purchases,'@VAT')
# Other fields that could conceivably be removed:
# @ExternalID, @PurchaseStateName, some fields in PurchasePayUnit, maybe others
# Filtering out a lot more fields to try to slim down the amount of data:
#purchases = remove_field(purchases,'@PurchaseGuid')
#purchases = remove_field(purchases,'@TerminalGuid')
#purchases = remove_field(purchases,'@PurchaseDateUtc')#
#purchases = remove_field(purchases,'@PayIntervalStartLocal')#
#purchases = remove_field(purchases,'@PayIntervalStartUtc')#
#purchases = remove_field(purchases,'@PayIntervalEndLocal')#
#purchases = remove_field(purchases,'@PayIntervalEndUtc')#
#purchases = remove_field(purchases,'@EndDateLocal')
#purchases = remove_field(purchases,'@EndDateUtc')#
#purchases = remove_field(purchases,'@PaymentServiceType')
#purchases = remove_field(purchases,'@TicketNumber') # Commented out 2019-01-28
#purchases = remove_field(purchases,'@TariffPackageID') # Commented out 2019-01-28
#purchases = remove_field(purchases,'@ExternalID') # Commented out 2019-01-28
#purchases = remove_field(purchases,'@PurchaseStateName')
purchases = remove_field(purchases,'@PurchaseTriggerTypeName')
#purchases = remove_field(purchases,'@PurchaseTypeName')#
purchases = remove_field(purchases,'@MaskedPAN','PurchasePayUnit')
purchases = remove_field(purchases,'@BankAuthorizationReference','PurchasePayUnit')
purchases = remove_field(purchases,'@CardFeeAmount','PurchasePayUnit')
purchases = remove_field(purchases,'@PayUnitID','PurchasePayUnit')
#purchases = remove_field(purchases,'@TransactionReference','PurchasePayUnit')
purchases = remove_field(purchases,'@CardIssuer','PurchasePayUnit')
return purchases
def get_doc_from_url(url):
r = requests.get(url, auth=(CALE_API_user, CALE_API_password))
if r.status_code == 403: # 403 = Forbidden, meaing that the CALE API
# has decided to shut down for a while (maybe for four hours
# after the last query of historical data).
raise RuntimeError("The CALE API is returning a 403 Forbidden error, making it difficult to accomplish anything.")
# Convert Cale's XML into a Python dictionary
doc = xmltodict.parse(r.text,encoding = r.encoding)
try:
# This try-catch clause is only protecting one of the three cases where
# the function is reading into doc without being sure that the fields
# are there (occasionally in practice they are not because of unknown
# stuff on the API end).
# The next time such an exception is thrown, it might make sense to
# look at what has been printed from doc and maybe put the call
# to get_doc_from_url into a try-catch clause.
url2 = doc['BatchDataExportResponse']['Url']
except:
pprint(doc)
print("Unable to get the first URL by using the command url2 = doc['BatchDataExportResponse']['Url'].")
print("Waiting 10 seconds and restarting.")
time.sleep(10)
return None, False
r2 = requests.get(url2, auth=(CALE_API_user, CALE_API_password))
if r2.status_code == 403:
raise RuntimeError("The CALE API is returning a 403 Forbidden error, making it difficult to accomplish anything.")
doc = xmltodict.parse(r2.text,encoding = r2.encoding)
delays = 0
while not r2.ok or doc['BatchDataExportFileResponse']['ExportStatus'] != 'Processed':
time.sleep(10)
r2 = requests.get(url2, auth=(CALE_API_user, CALE_API_password))
doc = xmltodict.parse(r2.text,encoding = r2.encoding)
delays += 1
if delays % 5 == 0:
print("|", end="", flush=True)
else:
print(".", end="", flush=True)
if delays > 30:
return None, False
url3 = doc['BatchDataExportFileResponse']['Url']
# When the ZIP file is ready:
delays = 0
r3 = requests.get(url3, stream=True, auth=(CALE_API_user, CALE_API_password))
if r3.status_code == 403:
raise RuntimeError("The CALE API is returning a 403 Forbidden error, making it difficult to accomplish anything.")
while not r3.ok and delays < 20:
time.sleep(5)
r3 = requests.get(url3, stream=True, auth=(CALE_API_user, CALE_API_password))
delays += 1
if delays % 5 == 0:
print("|", end="", flush=True)
else:
print(",", end="", flush=True)
if delays > 30:
return None, False
z = zipfile.ZipFile(BytesIO(r3.content))
time.sleep(0.5)
# Extract contents of a one-file zip file to memory:
xml = z.read(z.namelist()[0])
doc = xmltodict.parse(xml,encoding = 'utf-8')
return doc, True
def get_day_from_json_or_api(slot_start,tz,cache=True,mute=False,utc_json_folder='utc_json'):
"""Caches parking once it's been downloaded and checks
cache before redownloading.
Note that no matter what time of day is associated with slot_start,
this function will get all of the transactions for that entire day.
Filtering the results down to the desired time range is handled
elsewhere (in the calling function (e.g., get_utc_ps_for_day_from_json)).
Caching by date ties this approach to a particular time zone. This
is why transactions are dropped if we send this function a UTC
slot_start (I think).
This function seems to give the same result whether slot_start is
localized for UTC or Eastern, so long as tz is pytz.utc."""
date_format = '%Y-%m-%d'
slot_start = slot_start.astimezone(tz) # slot_start needs to already
# have a time zone associated with it. This line forces slot_start to be
# in timezone tz, even if it wasn't before.
dashless = slot_start.strftime('%y%m%d')
if tz == pytz.utc:
filename = path + utc_json_folder + "/" + dashless + ".json"
else:
filename = path + "json/"+dashless+".json"
too_soon = slot_start.date() >= datetime.now(tz).date()
# If the day that is being requested is today, definitely don't cache it.
recent = datetime.now(tz) - slot_start <= timedelta(days = 5) # This
# definition of recent is a little different since a) it uses slot_start
# rather than slot_end (which is fine here, as we know that slot_start
# and slot_end are separated by one day) and b) it uses the time zone tz
# (though that should be fine since slot_start has already been converted
# to time zone tz).
if not os.path.isfile(filename) or os.stat(filename).st_size == 0:
if not mute:
print("Sigh! {} not found, so I'm pulling the data from the API...".format(filename))
slot_start = beginning_of_day(slot_start)
slot_end = slot_start + timedelta(days = 1)
if recent:
base_url = f'{BASE_URL}LiveDataExport/4/LiveDataExportService.svc/purchases/'
else:
base_url = f'{BASE_URL}BatchDataExport/4/BatchDataExport.svc/purchase/ticket/'
url = build_url(base_url,slot_start,slot_end)
if not mute:
print("Here's the URL: {}".format(url))
if recent:
# [ ] pull_from_url currently has a different retry-on-failure model
# than get_doc_from_url (which is wrapped in a while loop so it will
# either succeed or keep trying forever, while pull_from_url eventually
# gives up).
r = pull_from_url(url)
doc = xmltodict.parse(r.text,encoding = 'utf-8')
ps = convert_doc_to_purchases(doc,slot_start,date_format)
else:
downloaded = False
while not downloaded:
doc, downloaded = get_doc_from_url(url)
print("!", end="", flush=True)
ps = convert_doc_to_purchases(doc['BatchExportRoot'],slot_start,date_format)
ps = add_hashes(ps)
purchases = cull_fields(ps)
#print("cache = {}, recent = {}, too_soon = {}".format(cache,recent,too_soon))
if cache and not too_soon:
# Check if directory exists.
directory = '/'.join(filename.split('/')[:-1])
if not os.path.isdir(directory):
os.mkdir(directory)
# Caching data from the LiveDataExport endpoint (but not today's data) is an interesting experiment.
with open(filename, "w") as f:
json.dump(purchases,f,indent=2)
if recent:
print(" !!!!!!!!!!! Cached some data from the LiveDataExport endpoint in {}".format(filename))
else: # Load locally cached version
with open(filename, "r", encoding="utf-8") as f:
ps = json.load(f)
return ps
def get_batch_parking_for_day(slot_start,tz,cache=True,mute=False,utc_json_folder="utc_json"):
"""Caches parking once it's been downloaded and checks
cache before redownloading.
Note that no matter what time of day is associated with slot_start,
this function will get all of the transactions for that entire day.
Filtering the results down to the desired time range is handled
elsewhere (in the calling function (get_batch_parking)).
Caching by date ties this approach to a particular time zone. This
is why transactions are dropped if we send this function a UTC
slot_start (I think) and try to use the Eastern Time Zone JSON
files. This has been fixed by specifying the timezone
and distinguishing between JSON-file folders."""
ps = get_day_from_json_or_api(slot_start,tz,cache,mute,utc_json_folder)
return ps
def get_batch_parking(slot_start,slot_end,cache,mute=False,utc_json_folder="utc_json",tz=pytz.timezone('US/Eastern'),time_field = '@PurchaseDateLocal',dt_format='%Y-%m-%dT%H:%M:%S'):
"""This function handles the situation where slot_start and slot_end are on different days
by calling get_batch_parking_for_day in a loop.
The parameter "time_field" determines which of the timestamps is used for calculating
the datetime values used to filter purchases down to those between slot_start
and start_end.
Note that the time zone tz and the time_field must be consistent for this to work properly."""
# Here is a little sanity check:
if (re.search('Utc',time_field) is not None) != (tz == pytz.utc):
# This does an XOR between these values.
raise RuntimeError("It looks like time_field may not be consistent with the provided time zone")
global last_date_cache, all_day_ps_cache, dts_cache
if last_date_cache != slot_start.date():
if not mute:
print("last_date_cache ({}) doesn't match slot_start.date() ({})".format(last_date_cache, slot_start.date()))
ps_all = []
dt_start_i = slot_start
while dt_start_i < slot_end:
ps_for_whole_day = get_batch_parking_for_day(dt_start_i,tz,cache,mute,utc_json_folder,utc_json_folder)
ps_all += ps_for_whole_day
dt_start_i += timedelta(days = 1)
if not mute:
print("Now there are {} transactions in ps_all.".format(len(ps_all)))
all_day_ps_cache = ps_all # Note that if slot_start and slot_end are not on the same day,
# all_day_ps_cache will hold transactions for more than just the date of slot_start, but
# since filtering is done further down in this function, this should not represent a
# problem. There should be no situations where more than two days of transactions will
# wind up in this cache at any one time.
dts_cache = [tz.localize(parser.parse(p[time_field])) for p in ps_all]
time.sleep(3)
else:
ps_all = all_day_ps_cache
#ps = [p for p in ps_all if slot_start <= tz.localize(datetime.strptime(p[time_field],'%Y-%m-%dT%H:%M:%S')) < slot_end] # This takes like 3 seconds to
# execute each time for busy days since the time calculations
# are on the scale of tens of microseconds.
# So let's generate the datetimes once (above), and do
# it this way:
ps = [p for p,dt in zip(ps_all,dts_cache) if slot_start <= dt < slot_end]
# Now instead of 3 seconds it takes like 0.03 seconds.
last_date_cache = slot_start.date()
return ps
########################
def get_payment_type(p):
# If the 'PurchasePayUnit' field cannot be found, use the terminal ID
# to detect whether it's a virtual payment.
if 'PurchasePayUnit' not in p:
terminal_id = p['@TerminalID']
if terminal_id[:3] == 'PBP':
return 'mobile'
elif re.match("^\d\d\d\d$", terminal_id) is not None or re.match("^\d\d\d\d\d$", terminal_id) is not None:
# Terminals with IDs like 5593.
# These usually have PurchasePayUnit fields and everyone I've checked has been mobile.
return 'mobile'
elif terminal_id[0] in ['2', '3', '4']:
return 'meter'
elif terminal_id[:4] == 'MTFD':
# Observations: Where there are coordinates for MTFD terminals, they appear
# to be the locations of those blue boxes.
# All MTFD transactions appear to be mobile transactions, so let's assume
# that the payment type is mobile.
return 'mobile'
else:
pprint(p)
raise ValueError("Unknown terminal type for terminal ID {} from payment {}.".format(terminal_id,p))
if type(p['PurchasePayUnit']) == list: # It's a list of Coin and Card payments.
return 'meter'
pay_unit_name = p['PurchasePayUnit']['@PayUnitName']
if pay_unit_name == 'Mobile Payment':
return 'mobile'
else: # In addition to "Mobile Payment" and "Coin" and "Card", there's also now "Manual", which is ignorable.
if pay_unit_name == 'Manual':
return 'manual'
elif pay_unit_name in ['Coin', 'Card', 'None']:
return 'meter'
else:
raise ValueError("Unknown payment type for @PayUnitName {} from payment {}.".format(pay_unit_name,p))
def is_mobile_payment(p):
return get_payment_type(p) == 'mobile'
def hybrid_parking_segment_start_of(p):
# In the CALE API, @Units == @EndDateUtc - @StartDateUtc.
# The same relation does not hold for @PurchaseDateUtc, which in rare
# cases can be on a different day from @StartDateUtc.
# Observed differences have been @PurchaseDateUtc - @StartDateUtc = 86393 seconds
# (in which case @PurchaseDateUtc was close to the delayed @DateCreatedUtc value)
# and -16504 seconds (in which case @StartDateUtc and @DateCreatedUtc were
# pretty consistent, but @PurchaseDateUtc was hours before either).
# For mobile transactions, PurchaseDateUtc == StartDateUtc.
time_field = {'mobile': p['@StartDateUtc'],
'meter': p['@StartDateUtc']}
if is_mobile_payment(p):
payment_type = 'mobile'
else:
return time_field['meter']
assert payment_type == 'mobile'
# Check whether the payment was purchased inside or outside of regular parking hours.
sdl = p['@StartDateLocal']
edl = p['@EndDateLocal']
amount = p['@Amount']
units = p['@Units']
if amount == '0': # For mobile transactions only.
return time_field['mobile']