-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmapping_util.py
780 lines (739 loc) · 29.8 KB
/
mapping_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
import os
import sys
import time
from datetime import datetime
import glob
import subprocess
import psycopg2 as sql
import sqlparse
from io import StringIO
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import importlib.util
# ---------------------------------------------------------
def get_parameters():
"Read user parameters"
# ---------------------------------------------------------
ret = True
try:
dir_study = ''
debug = False
arg_num = len(sys.argv)
if arg_num < 2:
print("Please enter the study folder after parameter -F without any space")
else:
for i in range(1, arg_num):
if sys.argv[i].upper()[:2] == "-F":
dir_study = sys.argv[i][2:]
module_name = '__postgres_db_conf'
spec = importlib.util.spec_from_file_location(module_name, dir_study + '/' + module_name + '.py')
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
db_conf = module.db_conf
if sys.argv[i].upper() == "-D":
debug = True
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("get_parameters", err[0], err[1]))
return(ret, dir_study, db_conf, debug)
# ---------------------------------------------------------
def calc_time(secs_tot):
"calculate the time as h, m and sec from seconds"
# ---------------------------------------------------------
(mins, secs) = divmod(secs_tot, 60)
(hours, mins) = divmod(mins, 60)
return("{0}h:{1:02d}m:{2:02d}s".format(int(hours), int(mins), int(secs)))
# ---------------------------------------------------------
def does_db_exist(db_conf):
"Check if a db exists"
# ---------------------------------------------------------
ret = True
exist = False
cnx = None
cursor1 = None
try:
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database='postgres'
)
database = db_conf['database']
cursor1 = cnx.cursor()
query1 = "SELECT 1 from pg_database where datname = '" + database + "'"
cursor1.execute(query1)
row = cursor1.fetchone()
if row != None:
exist = True
cursor1.close()
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("does_db_exist", err[0], err[1]))
if cursor1 != None:
cursor1.close()
if cnx != None:
cnx.close()
return(ret, exist)
# ---------------------------------------------------------
def create_db(db_conf):
"Create db"
# ---------------------------------------------------------
ret = True
cnx = None
cursor1 = None
try:
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database='postgres'
)
cnx.autocommit = True
database = db_conf['database']
cursor1 = cnx.cursor()
query1 = "CREATE DATABASE " + database + " WITH \
OWNER = postgres \
ENCODING = 'UTF8' \
LC_COLLATE = 'English_United States.1252' \
LC_CTYPE = 'English_United States.1252' \
TABLESPACE = tablespace_e \
CONNECTION LIMIT = -1 \
IS_TEMPLATE = False;"
cursor1.execute(query1)
query1 = "GRANT ALL ON DATABASE " + database + " TO dba";
cursor1.execute(query1)
cursor1.close()
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("create_db", err[0], err[1]))
if cursor1 != None:
cursor1.close()
if cnx != None:
cnx.close()
return(ret)
# ---------------------------------------------------------
def does_tbl_exist(cnx, tbl_name):
"Check if a table exists in he current database"
# ---------------------------------------------------------
ret = True
exist = False
cursor1 = None
try:
cursor1 = cnx.cursor()
pos = tbl_name.find('.')
if pos == -1:
schema_name = 'public'
else:
lst = tbl_name.split(".", 1)
schema_name = lst[0]
tbl_name = lst[1]
sql_exec = "SELECT COUNT(*) \
FROM pg_tables \
WHERE schemaname = '" + schema_name + "' \
AND tablename = '" + tbl_name + "'"
cursor1.execute(sql_exec)
row = cursor1.fetchone()
if row[0] > 0:
exist = True
cursor1.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("does_tbl_exist", err[0], err[1]))
if cursor1 != None:
cursor1.close()
return(ret, exist)
# ---------------------------------------------------------
def load_files(db_conf, schema, tbl_name, file_list, dir_processed, separator, with_quotes, null_string):
"Load files into tables"
# ---------------------------------------------------------
ret = True
try:
# print("load_files 1")
# ---------------------------------------------------------
# Connect to db
# ---------------------------------------------------------
# If list is not empty
if file_list:
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cnx.autocommit = True
cursor1 = cnx.cursor()
cursor1.execute('SET datestyle TO \'' + db_conf['datestyle'] + '\'')
cursor1.execute('SET search_path TO ' + schema)
# ---------------------------------------------------------
time1 = time.time()
data_provider = db_conf['data_provider']
# ---------------------------------------------------------
file_list_full = []
for i in range(len(file_list)):
for f in glob.iglob(file_list[i]):
file_list_full.append(f)
for fname in file_list_full:
print("File = {0}".format(fname))
# ---------------------------------------------------------
# Load - Delimiter is TAB
# ---------------------------------------------------------
stream = StringIO()
if data_provider == 'ukbiobank':
stream.write(open(fname, encoding='cp1252', errors = 'ignore').read().replace('\\', ''))
# elif data_provider == 'thin':
# stream.write(open(fname, errors = 'ignore').read().replace('\\', '').replace(',NA,', ',,'))
else:
stream.write(open(fname, errors = 'ignore').read().replace('\\', ''))
# stream.write(open(fname, errors = 'ignore').read().replace('\\', '').replace('\u0000', ''))
stream.seek(0)
stream.readline() #To avoid headers
if with_quotes == False:
cursor1.copy_from(stream, tbl_name, sep = separator, null = null_string)
else:
cursor1.copy_expert("COPY " + tbl_name + " FROM STDIN WITH (FORMAT CSV, delimiter '" + separator + "', quote '\"', NULL '" + null_string + "')", stream)
# ---------------------------------------------------------
# Move loaded file to PROCESSED directory
# ---------------------------------------------------------
file_processed = dir_processed + os.path.basename(fname)
os.rename(fname, file_processed)
cursor1.close()
cnx.close()
processing_time = "Files loaded in {0}".format(calc_time(time.time() - time1))
print(processing_time)
# ---------------------------------------------------------
# print("load_files 2")
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("load_files", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def load_files_parallel(db_conf, schema, tbl_list, file_list, dir_processed, separator = ' ', with_quotes = False, null_string = ''):
"Load files into tables"
# ---------------------------------------------------------
ret = True
try:
print("load_files_parallel 1")
time1 = time.time()
# ---------------------------------------------------------
# Load files in parallel (all tables), sequentially within each table
# ---------------------------------------------------------
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(load_files, db_conf, schema, tbl_name, file_list[idx], dir_processed, separator, with_quotes, null_string) for idx, tbl_name in enumerate(tbl_list)]
for future in as_completed(futures):
if future.result() == False:
ret = False
print('load_files_parallel stopped with errors at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S"))
break
if ret == True:
msg = '\n[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] load_files_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
# ---------------------------------------------------------
print("load_files_parallel 2")
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("load_files_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def get_table_count(db_conf, tbl_name, tbl_result, cnx=None):
# ---------------------------------------------------------
ret = True
new_connection = False
cursor1 = None
try:
if cnx == None:
new_connection = True
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cnx.autocommit = True
cursor1 = cnx.cursor()
# Check if table exists
query1 = 'SELECT to_regclass(\'' + tbl_name + '\')';
cursor1.execute(query1)
present = cursor1.fetchone()[0]
if present == None:
records = '0'
# Count records
else:
query1 = 'select count(*) from ' + tbl_name
cursor1.execute(query1)
records = str(cursor1.fetchone()[0])
schema_tbl, tbl_name_short = tbl_name.split(".")
schema_tbl_result, tbl_result_short = tbl_result.split(".")
# Store results in source_records
# if "." in tbl_name:
tbl_name_short = "\'" + tbl_name_short + "\'"
# else:
# tbl_name_short = tbl_name
query1 = 'select * from ' + tbl_result + ' where tbl_name = ' + tbl_name_short
cursor1.execute(query1)
present = cursor1.fetchone()
if schema_tbl == db_conf['target_schema']:
if present == None:
query1 = 'insert INTO ' + tbl_result + ' (tbl_name, total_records) VALUES (' + tbl_name_short + ', ' + records + ')'
cursor1.execute(query1)
print(f'{tbl_name} row count: {records}')
else:
if present == None:
query1 = 'insert INTO ' + tbl_result + ' (tbl_name, ' + schema_tbl + '_records) VALUES (' + tbl_name_short + ', ' + records + ')'
cursor1.execute(query1)
print(f'{tbl_name} row count: {records}')
else:
query1 = 'update ' + tbl_result + ' SET ' + schema_tbl + '_records = ' + records + ' where tbl_name = ' + tbl_name_short
cursor1.execute(query1)
print(f'{tbl_name} row count: {records}')
query1 = 'update ' + tbl_result + ' SET total_records = COALESCE(' + schema_tbl_result + '_records,0) + COALESCE(' + schema_tbl_result + '_nok_records,0) where tbl_name = ' + tbl_name_short
cursor1.execute(query1)
cursor1.close()
cursor1 = None
if new_connection == True:
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("get_table_count", err[0], err[1]))
if cursor1 != None:
cursor1.close()
if new_connection == True:
cnx.close()
return(ret)
# ---------------------------------------------------------
def get_table_count_parallel(db_conf, tbl_list, tbl_records):
# ---------------------------------------------------------
ret = True
try:
time1 = time.time()
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(get_table_count, db_conf, tbl, tbl_records) for tbl in tbl_list]
for future in as_completed(futures):
if future.result() == False:
ret = False
msg = 'get_table_count_parallel stopped with error at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
break
if ret == True:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] get_table_count_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("get_table_count_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def get_table_max_ids(db_conf, tbl_name, tbl_result, cnx=None):
# ---------------------------------------------------------
ret = True
new_connection = False
cursor1 = None
try:
if cnx == None:
new_connection = True
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cnx.autocommit = True
schema_tbl, tbl_name_short = tbl_name.split(".")
cursor1 = cnx.cursor()
# Check if table exists
query1 = 'SELECT to_regclass(\'' + tbl_name + '\')';
cursor1.execute(query1)
present = cursor1.fetchone()[0]
if present == None:
query1 = 'INSERT INTO ' + tbl_result + ' (tbl_name, max_id) \
VALUES (\'' + tbl_name_short + '\', 0)'
cursor1.execute(query1)
# Calculate maxid
else:
query1 = 'INSERT INTO ' + tbl_result + ' (tbl_name, max_id) \
VALUES (\'' + tbl_name_short + '\', \
(select coalesce(MAX(col1),0) from (select col1 from ' + tbl_name + ' as t1(col1)) as max_id))'
cursor1.execute(query1)
print(f'{tbl_name} max_id calculated')
cursor1.close()
cursor1 = None
if new_connection == True:
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("get_table_max_ids", err[0], err[1]))
if cursor1 != None:
cursor1.close()
if new_connection == True:
cnx.close()
return(ret)
# ---------------------------------------------------------
def get_table_max_ids_parallel(db_conf, tbl_list, tbl_result):
# ---------------------------------------------------------
ret = True
try:
time1 = time.time()
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(get_table_max_ids, db_conf, tbl, tbl_result) for tbl in tbl_list]
for future in as_completed(futures):
if future.result() == False:
ret = False
msg = 'get_table_max_ids_parallel stopped with error at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
break
if ret == True:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] get_table_max_ids_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("get_table_max_ids_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def parse_queries_file(db_conf, filename, chunk_id=None):
# ---------------------------------------------------------
database = db_conf['database'] if 'database' in db_conf else None
source_nok_schema = db_conf['source_nok_schema'] if 'source_nok_schema' in db_conf else None
source_schema = db_conf['source_schema'] if 'source_schema' in db_conf else None
target_schema = db_conf['target_schema'] if 'target_schema' in db_conf else None
target_schema_to_link = db_conf['target_schema_to_link'] if 'target_schema_to_link' in db_conf else None
vocabulary_schema = db_conf['vocabulary_schema'] if 'vocabulary_schema' in db_conf else None
chunk_schema = db_conf['chunk_schema'] if 'chunk_schema' in db_conf else None
result_schema = db_conf['result_schema'] if 'result_schema' in db_conf else None
lookup_directory = db_conf['dir_lookup'] if 'dir_lookup' in db_conf else None
vocabulary_directory = db_conf['dir_voc'] if 'dir_voc' in db_conf else None
stcm_directory = db_conf['dir_stcm'] if 'dir_stcm' in db_conf else None
medical_dictionary = db_conf['medical_dictionary_filename'] if 'medical_dictionary_filename' in db_conf else None
product_dictionary = db_conf['product_dictionary_filename'] if 'product_dictionary_filename' in db_conf else None
chunk_size = str(db_conf['chunk_size']) if 'chunk_size' in db_conf else None
source_release_date = db_conf['source_release_date'] if 'source_release_date' in db_conf else None
query_list = open(filename).read().split(';')
for idx, item in enumerate(query_list):
query = sqlparse.format(item, strip_comments=True).strip()
query = query.replace('{DATABASE}', database) if database is not None else query
query = query.replace('{SOURCE_NOK_SCHEMA}', source_nok_schema) if source_nok_schema is not None else query
query = query.replace('{SOURCE_SCHEMA}', source_schema) if source_schema is not None else query
query = query.replace('{TARGET_SCHEMA}', target_schema) if target_schema is not None else query
query = query.replace('{TARGET_SCHEMA_TO_LINK}', target_schema_to_link) if target_schema_to_link is not None else query
query = query.replace('{VOCABULARY_SCHEMA}', vocabulary_schema) if vocabulary_schema is not None else query
query = query.replace('{CHUNK_SCHEMA}', chunk_schema) if chunk_schema is not None else query
query = query.replace('{RESULT_SCHEMA}', result_schema) if result_schema is not None else query
query = query.replace('{LOOKUP_DIRECTORY}', lookup_directory) if lookup_directory is not None else query
query = query.replace('{VOCABULARY_DIRECTORY}', vocabulary_directory) if vocabulary_directory is not None else query
query = query.replace('{STCM_DIRECTORY}', stcm_directory) if stcm_directory is not None else query
query = query.replace('{MEDICAL_DICTIONARY}', medical_dictionary) if medical_dictionary is not None else query
query = query.replace('{PRODUCT_DICTIONARY}', product_dictionary) if product_dictionary is not None else query
query = query.replace('{CHUNK_SIZE}', chunk_size) if chunk_size is not None else query
query = query.replace('{CHUNK_ID}', chunk_id) if chunk_id is not None else query
query = query.replace('{SOURCE_RELEASE_DATE}', source_release_date) if source_release_date is not None else query
query_list[idx] = query
return(query_list)
# ---------------------------------------------------------
def execute_query(db_conf, query, debug = True):
# ---------------------------------------------------------
ret = True
try:
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cursor1 = cnx.cursor()
cnx.autocommit = True
if debug:
time1 = time.time()
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] Processing: ' + query.split('\n')[0]
print(msg)
cursor1.execute(query)
if debug:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] Query execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
cursor1.close()
cursor1 = None
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("execute_query", err[0], err[1]))
if cursor1 != None:
cursor1.close()
cnx.close()
return(ret)
# ---------------------------------------------------------
def execute_multiple_queries(db_conf, filename, chunk_id = None, cnx = None, commit = True, debug = True, move_files = True):
# ---------------------------------------------------------
ret = True
new_connection = False
try:
if os.path.isfile(filename):
if cnx == None:
new_connection = True
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cnx.autocommit = commit
# print(cnx.info.dsn_parameters)
cursor1 = cnx.cursor()
queries = parse_queries_file(db_conf, filename, chunk_id)
for query in queries:
if query != '':
if debug:
time1 = time.time()
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] Processing: ' + query.split('\n')[0]
print(msg)
cursor1.execute(query)
if debug:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] Finished : ' + query.split('\n')[0] + ' in '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
if move_files == True:
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
file_processed = dir_sql_processed + os.path.basename(filename)
os.rename(filename, file_processed)
cursor1.close()
cursor1 = None
if new_connection == True:
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("execute_multiple_queries", err[0], err[1]))
if cursor1 != None:
cursor1.close()
if new_connection == True:
cnx.close()
return(ret)
# ---------------------------------------------------------
def execute_sql_file_parallel(db_conf, fname, debug = True, move_files = True):
# The queries in the file are executed in parallel
# ---------------------------------------------------------
ret = True
try:
time1 = time.time()
query_list = parse_queries_file(db_conf, fname)
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(execute_query, db_conf, query, debug) for query in query_list if query != '']
# Retrieve the results in completion order
for future in as_completed(futures):
if future.result() == False:
ret = False
msg = 'execute_sql_file_parallel stopped with error at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
break
if ret == True:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] execute_sql_file_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
if move_files == True:
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
file_processed = dir_sql_processed + os.path.basename(fname)
os.rename(fname, file_processed)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("execute_sql_file_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def execute_sql_files_parallel(db_conf, fname_list, debug = True):
# The sql files are executed in parallel
# ---------------------------------------------------------
ret = True
try:
time1 = time.time()
fname_list = [fname for fname in fname_list if os.path.isfile(fname)]
if len(fname_list) > 0:
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(execute_multiple_queries, db_conf, fname, None, None, True, debug) for fname in fname_list]
for future in as_completed(futures):
if future.result() == False:
ret = False
msg = 'execute_sql_files_parallel stopped with error at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
break
if ret == True:
msg = '[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] execute_sql_files_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("execute_sql_files_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def unzip_file(fname, extraction_method, extraction_folder):
# The queries in the file are executed in parallel
# ---------------------------------------------------------
ret = True
try:
# ---------------------------------------------------------
# Unzip and move to the processed directory
# 7zip command 'e' = Extract
# 7zip command 'x' = eXtract with full paths
# 7zip switch '-o' = set Output directory. NO space between -o and the output directory (e.g. "-oC:\Temp")
# 7zip switch '-bso0' = disable stream of standard output
# 7zip switch '-aos' = skip extracting of existing files.
# ---------------------------------------------------------
print("\nUnzipping file = {0}".format(fname))
# ---------------------------------------------------------
# Unzip with 7Z
# ---------------------------------------------------------
dir_downloaded = os.path.dirname(fname)
if subprocess.call(['7z', extraction_method, fname, '-bso0', '-aos', '-o'+extraction_folder]) != 0:
ret = False
# ---------------------------------------------------------
# Move zipped file to PROCESSED directory
# ---------------------------------------------------------
else:
file_processed = dir_downloaded + '\\processed\\' + os.path.basename(fname)
os.rename(fname, file_processed)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("unzip_file", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def execute_unzip_parallel(file_list, extraction_method, extraction_folder):
# The zipped files are unzipped in parallel
# ---------------------------------------------------------
ret = True
try:
time1 = time.time()
with ProcessPoolExecutor(len(file_list)) as executor:
futures = [executor.submit(unzip_file, file_list[idx], extraction_method[idx], extraction_folder[idx]) for idx, fname in enumerate(file_list)]
# Retrieve the results in completion order
for future in as_completed(futures):
if future.result() == False:
ret = False
msg = 'Parallel execution stopped with error at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
break
if ret == True:
msg = '\n[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] Parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("execute_unzip_parallel", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def load_folders(db_conf, schema, folder):
"Load files from folders into tables"
# ---------------------------------------------------------
ret = True
try:
print("load_folders 1")
data_provider = db_conf['data_provider']
if data_provider == 'cprd':
file_list = sorted(glob.iglob(folder + '\\*.txt'))
elif data_provider == 'iqvia':
file_list = sorted(glob.iglob(folder + '\\*.csv')) + sorted(glob.iglob(folder + '\\*.out'))
elif data_provider == 'thin':
file_list = sorted(glob.iglob(folder + '\\*.csv'))
elif data_provider == 'ukbiobank':
file_list = sorted(glob.iglob(folder + '\\*.tsv'))
elif data_provider == 'ukb':
file_list = sorted(glob.iglob(folder + '\\*.*'))
# ---------------------------------------------------------
# If list is not empty
# ---------------------------------------------------------
if file_list:
# ---------------------------------------------------------
# Connect to db
# ---------------------------------------------------------
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=db_conf['database']
)
cnx.autocommit = True
cursor1 = cnx.cursor()
cursor1.execute('SET datestyle TO \'' + db_conf['datestyle'] + '\'')
cursor1.execute('SET search_path TO ' + schema)
# ---------------------------------------------------------
time1 = time.time()
# ---------------------------------------------------------
# Create PROCESSED directory if does not exist
# ---------------------------------------------------------
dir_processed = folder + db_conf['dir_processed']
if not os.path.exists(dir_processed):
os.makedirs(dir_processed)
pos = folder.rfind('\\')
tbl_name = folder[pos+1:].lower()
# ---------------------------------------------------------
for fname in file_list:
print("File = {0}".format(fname))
# ---------------------------------------------------------
# Load - Delimiter is ASCII Character ò = E'\242' = E'\xF2' , encoding = 'cp437' .replace('ï¼', '-')
# ---------------------------------------------------------
stream = StringIO()
if data_provider == 'cprd' or data_provider == 'ukb':
# stream.write(open(fname, errors = 'ignore').read())
stream.write(open(fname, errors = 'ignore').read().replace('\\', ''))
elif data_provider == 'iqvia':
stream.write(open(fname, errors = 'backslashreplace').read().replace('ò', ' ').replace('ï¼', '-').replace('\\x8d', '').replace('\\x81', '').replace('\\x8f', '').replace('\\x90', '').replace('\\x9d', '').replace('\\xd9', ''))
elif data_provider == 'thin':
stream.write(open(fname, errors = 'ignore').read().replace('\\', ''))
elif data_provider == 'ukbiobank':
# stream.write(open(fname, encoding='cp1252', errors = 'ignore').read().replace('\\', ''))
stream.write(open(fname, errors = 'ignore').read().replace('\\', '').replace('\u0000', ''))
stream.seek(0)
stream.readline() #To avoid headers
if data_provider == 'ukb':
if tbl_name == 'baseline' or tbl_name == 'cancer':
cursor1.copy_expert("COPY " + tbl_name + " FROM STDIN WITH (FORMAT CSV, delimiter ',', quote '\"', NULL 'NA')", stream)
else:
cursor1.copy_expert("COPY " + tbl_name + " FROM STDIN WITH (FORMAT CSV, delimiter ' ', quote '\u0007', NULL '')", stream)
else:
cursor1.copy_from(stream, tbl_name, sep = ' ', null = '')
# ---------------------------------------------------------
# Move loaded file to PROCESSED directory
# ---------------------------------------------------------
file_processed = dir_processed + os.path.basename(fname)
os.rename(fname, file_processed)
cursor1.close()
cnx.close()
processing_time = "Files loaded in {0}".format(calc_time(time.time() - time1))
print(processing_time)
# ---------------------------------------------------------
print("load_folders 2")
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("load_folders", err[0], err[1]))
return(ret)
# ---------------------------------------------------------
def load_folders_parallel(db_conf, schema, folder_list):
"Load files into tables"
# ---------------------------------------------------------
ret = True
try:
print("load_folders_parallel 1")
time1 = time.time()
# ---------------------------------------------------------
# Load files in parallel (all tables), sequentially within each table
# ---------------------------------------------------------
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
futures = [executor.submit(load_folders, db_conf, schema, folder) for folder in folder_list]
for future in as_completed(futures):
if future.result() == False:
ret = False
print('load_folders_parallel stopped with errors at ' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S"))
break
if ret == True:
msg = '\n[' + datetime.now().strftime("%d/%m/%Y, %H:%M:%S") + '] load_folders_parallel execution time: '
msg += calc_time(time.time() - time1) + "\n"
print(msg)
# ---------------------------------------------------------
print("load_folders_parallel 2")
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("load_folders_parallel", err[0], err[1]))
return(ret)