-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path1_load_source_data.py
203 lines (197 loc) · 9.4 KB
/
1_load_source_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import os
import sys
import time
import glob
from datetime import datetime
import psycopg2 as sql
from importlib.machinery import SourceFileLoader
mapping_util = SourceFileLoader('mapping_util', os.path.dirname(os.path.realpath(__file__)) + '/mapping_util.py').load_module()
# ---------------------------------------------------------
def is_curation_needed_aurum(tbl_patient, tbl_observation):
"Check if curation is necessary"
# ---------------------------------------------------------
ret = True
curation = False
try:
cnx = sql.connect(
user = db_conf['username'],
password = db_conf['password'],
database = db_conf['database']
)
cursor1 = cnx.cursor()
query1 = "SELECT 1 FROM " + tbl_patient + " WHERE acceptable = 0 OR gender in (0,3,4) OR gender is null OR yob < 1875 OR regstartdate is null LIMIT 1"
cursor1.execute(query1);
found = cursor1.fetchone()
if found != None:
curation = True
else:
query1 = "SELECT 1 FROM " + tbl_observation + " WHERE obsdate is null"
cursor1.execute(query1);
found = cursor1.fetchone()
if found != None:
curation = True
cursor1.close()
cnx.close()
except:
ret = False
err = sys.exc_info()
print("Function = {0}, Error = {1}, {2}".format("is_curation_needed_aurum", err[0], err[1]))
return(ret, curation)
# ---------------------------------------------------------
# MAIN PROGRAM
# ---------------------------------------------------------
def main():
ret = True
global db_conf
try:
(ret, dir_study, db_conf, debug) = mapping_util.get_parameters()
if ret == True and dir_study != '':
time0 = time.time()
database_type = db_conf['database_type']
source_schema = db_conf['source_schema']
dir_source_files = dir_study + '\\data\\'
tbl_db = 'tbl_' + database_type
tbl_db_list = [source_schema + "." + tbl for tbl in db_conf[tbl_db]]
dir_sql = os.getcwd() + '\\sql_scripts\\'
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
if not os.path.exists(dir_sql_processed):
os.makedirs(dir_sql_processed)
# ---------------------------------------------------------
# If database does not exist, create database
# ---------------------------------------------------------
(ret, exist) = mapping_util.does_db_exist(db_conf)
if exist == False:
ret = mapping_util.create_db(db_conf)
if ret == True:
# ---------------------------------------------------------
# Create the schemas
# ---------------------------------------------------------
fname = dir_sql + '1__schema_create.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
if ret == True:
# ---------------------------------------------------------
# Ask the user for DROP confirmation
# ---------------------------------------------------------
qa = input('Are you sure you want to DROP all the ' + database_type.upper() + ' tables (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Are you sure you want to DROP all the ' + database_type.upper() + ' tables (y/n):')
if qa.lower() in ['y', 'yes']:
fname = dir_sql + '1a_' + database_type + '_drop.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
if ret == True:
# ---------------------------------------------------------
# Ask the user for LOAD confirmation
# ---------------------------------------------------------
qa = input('Are you sure you want to CREATE/LOAD all the ' + database_type.upper() + ' tables (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Are you sure you want to CREATE/LOAD all the ' + database_type.upper() + ' tables (y/n):')
if qa.lower() in ['y', 'yes']:
# ---------------------------------------------------------
# Create source tables
# ---------------------------------------------------------
time1 = time.time()
fname = dir_sql + '1b_' + database_type + '_create.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
# ---------------------------------------------------------
# Load source data
# ---------------------------------------------------------
if ret == True:
dir_list_folders = [dir_source_files + tbl for tbl in db_conf[tbl_db]]
print(dir_list_folders)
ret = mapping_util.load_folders_parallel(db_conf, source_schema, dir_list_folders)
if ret == True:
task_finished = "Finished loading " + database_type.upper() + " source data in {0}".format(mapping_util.calc_time(time.time() - time1))
print(task_finished)
# ---------------------------------------------------------
# Ask the user for PK/IDX creation confirmation
# ---------------------------------------------------------
if ret == True:
qa = input('Are you sure you want to CREATE PK/IDXs on all the ' + database_type.upper() + ' tables (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('Are you sure you want to CREATE PK/IDXs on all the ' + database_type.upper() + ' tables (y/n):')
if qa.lower() in ['y', 'yes']:
# ---------------------------------------------------------
# Build PKs & IDXs
# ---------------------------------------------------------
time1 = time.time()
print('Build PKs and IDXs ...')
sql_file_list = sorted(glob.iglob(dir_sql + '1c_' + database_type + '_pk_idx*.sql'))
print(dir_sql + '1c_' + database_type + '_pk_idx*.sql')
print(sql_file_list)
ret = mapping_util.execute_sql_files_parallel(db_conf, sql_file_list, True)
if ret == True:
task_finished = 'Finished adding PKs/indexes to ' + database_type.upper() + ' in {0}'.format(mapping_util.calc_time(time.time() - time1))
print(task_finished)
# ---------------------------------------------------------
# Check for curation
# ---------------------------------------------------------
if ret == True:
qa = input('Are you sure you want to CHECK/CURATE ' + database_type.upper() + ' (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('Are you sure you want to CHECK/CURATE ' + database_type.upper() + ' (y/n):')
if qa.lower() in ['y', 'yes']:
time1 = time.time()
if database_type == 'aurum':
idx_patient = db_conf[tbl_db].index('patient')
idx_observation = db_conf[tbl_db].index('observation')
(ret, curation) = is_curation_needed_aurum(tbl_db_list[idx_patient], tbl_db_list[idx_observation])
elif database_type in ['gold', 'ons', 'ncrascr']:
curation = True
elif database_type[0:3].lower() == 'hes':
curation = True
elif database_type [0:3].lower()== 'ukb':
curation = True
if ret == True and curation == True:
fname = dir_sql + '1d_' + database_type + '_curation.sql'
print('Executing ' + fname + ' ...')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, True)
if ret == True:
task_finished = "Finished curation on " + database_type.upper() + " data in {0}".format(mapping_util.calc_time(time.time() - time1))
print(task_finished)
# ---------------------------------------------------------
# Ask the user for RECORD COUNTS confirmation
# ---------------------------------------------------------
if ret == True:
qa = input('Are you sure you want to COUNT the records for all the ' + database_type.upper() + ' tables (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('Are you sure you want to COUNT the records for all the ' + database_type.upper() + ' tables (y/n):')
if qa.lower() in ['y', 'yes']:
time1 = time.time()
fname = dir_sql + '1e_source_records_create.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_multiple_queries(db_conf, fname)
if ret == True:
source_nok_schema = db_conf['source_nok_schema']
tbl_list_count = tbl_db_list + [source_nok_schema + "." + tbl for tbl in db_conf[tbl_db]]
ret = mapping_util.get_table_count_parallel(db_conf, tbl_list_count, source_schema + '._records')
if ret == True:
task_finished = "Finished counting on " + database_type.upper() + " data in {0}".format(mapping_util.calc_time(time.time() - time1))
print(task_finished)
# ---------------------------------------------------------
# Move CODE to the processed directory?
# ---------------------------------------------------------
if ret == True:
qa = input('Are you sure you want to MOVE all the source CODE in the "processed" folder (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Are you sure you want to MOVE all the source CODE in the "processed" folder (y/n):')
if qa.lower() in ['y', 'yes']:
for f in glob.iglob(dir_sql + '1*.sql'):
file_processed = dir_sql_processed + os.path.basename(f)
os.rename(f, file_processed)
print('Finished MOVING code files')
# ---------------------------------------------------------
# Report total time
# ---------------------------------------------------------
if ret == True:
process_finished = "{0} completed in {1}".format(os.path.basename(__file__), mapping_util.calc_time(time.time() - time0))
print(process_finished)
except:
print(str(sys.exc_info()[1]))
# ---------------------------------------------------------
# Protect entry point
# ---------------------------------------------------------
if __name__ == "__main__":
main()