-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path4_map_source_in_chunk.py
306 lines (301 loc) · 16 KB
/
4_map_source_in_chunk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import os
import sys
import glob
import time
import datetime
import psycopg2 as sql
from importlib.machinery import SourceFileLoader
mapping_util = SourceFileLoader('mapping_util', os.path.dirname(os.path.realpath(__file__)) + '/mapping_util.py').load_module()
# ---------------------------------------------------------
# MAIN PROGRAM
# ---------------------------------------------------------
def main():
ret = True
cnx = None
try:
(ret, dir_study, db_conf, debug) = mapping_util.get_parameters()
if ret == True and dir_study != '':
time0 = time.time()
database_type = db_conf['database_type']
dir_sql = os.getcwd() + "\\sql_scripts\\"
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
if not os.path.exists(dir_sql_processed):
os.makedirs(dir_sql_processed)
# ---------------------------------------------------------
# Create the schemas, if not present
# ---------------------------------------------------------
fname = dir_sql + '4__schema_create.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False, False)
if ret == True:
# ---------------------------------------------------------
# Create/Recreate CDM tables? Parallel execution of queries in the file - Ask the user for DROP confirmation
# ---------------------------------------------------------
qa = input('Are you sure you want to DROP/CREATE all the CDM tables (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Are you sure you want to DROP/CREATE all the CDM tables (y/n):')
if qa.lower() in ['y', 'yes']:
fname = dir_sql + '4a_cdm_drop_tbl.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug, False)
if ret == True:
cdm_version = db_conf['cdm_version']
if cdm_version[:3] == '5.3':
fname = dir_sql + '4b_OMOPCDM_postgresql_5_3_ddl.sql'
elif cdm_version[:3] == '5.4':
fname = dir_sql + '4b_OMOPCDM_postgresql_5_4_ddl.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False, False)
# ---------------------------------------------------------
# Connect to db
# ---------------------------------------------------------
if ret == True:
database = db_conf['database']
cnx = sql.connect(
user=db_conf['username'],
password=db_conf['password'],
database=database
)
cursor1 = cnx.cursor()
cnx.autocommit = True
# ---------------------------------------------------------
# Insert record in cdm_source
# ---------------------------------------------------------
vocabulary_schema = db_conf['vocabulary_schema']
target_schema = db_conf['target_schema']
chunk_schema = db_conf['chunk_schema']
source_release_date = db_conf['source_release_date']
source_description = db_conf['data_provider'] + ' ' + database_type
cdm_holder = 'NDORMS'
source_documentation_reference = 'https://github.com/oxford-pharmacoepi/etl_ndorms'
cdm_etl_reference = db_conf['cdm_etl_reference']
cdm_version = db_conf['cdm_version']
query1 = 'select 1 from ' + target_schema + '.cdm_source'
cursor1.execute(query1)
rec_found = cursor1.fetchone()
if rec_found == None:
print('Inserting record in CDM_SOURCE ...')
query1 = 'INSERT INTO ' + target_schema + '.cdm_source \
select \
\'' + database + '\', \
\'' + database[:25] + '\', \
\'' + cdm_holder + '\', \
\'' + source_description + '\', \
\'' + source_documentation_reference + '\', \
\'' + cdm_etl_reference + '\', \
TO_DATE(\'' + source_release_date + '\',\'YYYY-MM-DD\'), \
CURRENT_DATE, \'' + \
cdm_version[0:3] + '\',' #the minor-version will fail to run DQD
if cdm_version[0:3] >= '5.4':
query1 += '(select concept_id from ' + vocabulary_schema + '.concept WHERE domain_id = \'Metadata\' \
and standard_concept = \'S\' \
and invalid_reason is null \
and position(lower(\'OMOP CDM Version\') in lower(concept_name)) > 0 \
and position(\'' + cdm_version + '\' in concept_name) > 0), '
query1 += '(SELECT vocabulary_version FROM ' + vocabulary_schema + '.vocabulary WHERE vocabulary_id = \'None\')'
cursor1.execute(query1)
# ---------------------------------------------------------
# If this is a linked dataset, create/recreate _max_ids table in target_schema_to_link
# ---------------------------------------------------------
if ret == True:
if 'target_schema_to_link' in db_conf and db_conf['target_schema_to_link'] != db_conf['target_schema']:
target_schema_to_link = db_conf['target_schema_to_link']
qa = input('Do you want to CREATE/RECREATE the _max_ids table in ' + target_schema_to_link + '? (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Do you want to CREATE/RECREATE the _max_ids table in ' + db_conf['target_schema_to_link'] + '? (y/n):').lower()
if qa in ['y', 'yes']:
tbl_max_ids = target_schema_to_link + '._max_ids'
query1 = 'DROP TABLE IF EXISTS ' + tbl_max_ids + ' CASCADE';
cursor1.execute(query1)
query1 = 'CREATE TABLE ' + tbl_max_ids + ' \
(tbl_name varchar(25) NOT NULL, \
max_id bigint DEFAULT 0) TABLESPACE pg_default;'
cursor1.execute(query1)
time1 = time.time()
tbl_list_count = [target_schema_to_link + "." + tbl for tbl in db_conf['tbl_cdm']]
ret = mapping_util.get_table_max_ids_parallel(db_conf, tbl_list_count, tbl_max_ids)
if ret == True:
query1 = 'with cte as (SELECT MAX(max_id) as max_id FROM ' + tbl_max_ids + ' WHERE tbl_name in \
(\'condition_occurrence\', \'device_exposure\', \'drug_exposure\', \'measurement\', \
\'observation\', \'procedure_occurrence\', \'visit_detail\', \'visit_occurrence\')) \
INSERT INTO ' + tbl_max_ids + ' (tbl_name, max_id) \
SELECT \'max_of_all\', max_id \
FROM cte';
cursor1.execute(query1)
query1 = 'ALTER TABLE ' + tbl_max_ids + ' ADD CONSTRAINT pk_max_ids PRIMARY KEY (tbl_name);'
cursor1.execute(query1)
msg = 'Finished calculating max_ids in ' + target_schema_to_link.upper() + ' data in ' + mapping_util.calc_time(time.time() - time1) + '\n'
print(msg)
# ---------------------------------------------------------
# Tables to load: PERSON, OBSERVATION_PERIOD, etc.
# ---------------------------------------------------------
if ret == True:
qa = input('Do you want to map the simple tables: PERSON, OBSERVATION_PERIOD, etc. (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Do you want to map the simple tables: LOCATION, CARE_SITE, PROVIDER, PERSON, DEATH, OBSERVATION_PERIOD (y/n):')
if qa in ['y', 'yes']:
fname = dir_sql + '4c_' + database_type + '_map_tbl_simple.sql'
print('Executing ' + fname + ' ... (PERSON, OBSERVATION_PERIOD, etc.)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug, False)
# ---------------------------------------------------------
# Tables to load: TEMP_CONCEPT_MAP, TEMP_DRUG_CONCEPT_MAP, TEMP_VISIT_DETAIL
# ---------------------------------------------------------
if ret == True:
if database_type in ['aurum', 'ukb_gp', 'ukb_hesin', 'ukb_cancer', 'ncrascr']:
qa = input('Do you want to CREATE/RECREATE the temp tables (TEMP_CONCEPT_MAP, TEMP_DRUG_CONCEPT_MAP, TEMP_VISIT_DETAIL)? (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Do you want to CREATE/RECREATE the temp tables (TEMP_CONCEPT_MAP, TEMP_DRUG_CONCEPT_MAP, TEMP_VISIT_DETAIL? (y/n):').lower()
if qa in ['y', 'yes']:
fname = dir_sql + '4d' + db_conf['cdm_version'][2] + '_' + database_type + '_map_tbl_tmp.sql'
print('Executing ' + fname + ' ... (TEMP_CONCEPT_MAP, TEMP_DRUG_CONCEPT_MAP, TEMP_VISIT_DETAIL)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug, False)
# ---------------------------------------------------------
# Tables to load: VISIT_OCCURRENCE, VISIT_DETAIL
# ---------------------------------------------------------
if ret == True:
if 'ukb'!= database_type: #ukb baseline has no event tables
qa = input('Do you want to CREATE/RECREATE the visit tables (VISIT_OCCURRENCE, VISIT_DETAIL)? (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Do you want to CREATE/RECREATE the visit tables (VISIT_OCCURRENCE, VISIT_DETAIL)? (y/n):').lower()
if qa in ['y', 'yes']:
fname = dir_sql + '4e' + db_conf['cdm_version'][2] + '_' + database_type + '_map_tbl_visit.sql'
print('Executing ' + fname + ' ... (VISIT_OCCURRENCE, VISIT_DETAIL)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug, False)
# ---------------------------------------------------------
# Create/Recreate CHUNK table and any chunk job previously done?
# ---------------------------------------------------------
if ret == True:
if database_type not in ('hesop', 'ukb'): #They do not use STEM and do not need chunking
qa = input('Do you want to CREATE/RECREATE the chunk table and remove any chunk work previously done? (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Do you want to CREATE/RECREATE the chunk table and remove any chunk work previously done? (y/n):').lower()
if qa in ['y', 'yes']:
# ---------------------------------------------------------
# Delete possible old stem_source_x and stem_x tables
# ---------------------------------------------------------
(ret, exist) = mapping_util.does_tbl_exist(cnx, chunk_schema + '.chunk')
if ret == True and exist == True:
query1 = 'SELECT stem_source_tbl, stem_tbl FROM ' + chunk_schema + '.chunk WHERE completed = 1'
cursor1.execute(query1)
tbl_array = cursor1.fetchall()
stem_source_list = list(map(lambda x: x[0], tbl_array))
stem_list = list(map(lambda x: x[1], tbl_array))
for tbl_id in range(0,len(stem_source_list)):
query1 = 'DROP TABLE IF EXISTS ' + chunk_schema + '.' + stem_source_list[tbl_id];
cursor1.execute(query1)
if stem_list[tbl_id] != None:
query1 = 'DROP TABLE IF EXISTS ' + chunk_schema + '.' + stem_list[tbl_id];
cursor1.execute(query1)
fname = dir_sql + '4f_' + database_type + '_map_tbl_chunk.sql'
print('Executing ' + fname + ' ... (CHUNK)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug, False)
# Necessary to recall 4b here if the CDM tables were deleted
if ret == True:
cdm_version = db_conf['cdm_version']
if cdm_version[:3] == '5.3':
fname = dir_sql + '4b_OMOPCDM_postgresql_5_3_ddl.sql'
elif cdm_version[:3] == '5.4':
fname = dir_sql + '4b_OMOPCDM_postgresql_5_4_ddl.sql'
print('Calling ' + fname + ' ...')
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False, False)
# ---------------------------------------------------------
# Start/Restart chunking
# ---------------------------------------------------------
if ret == True:
qa = input('Would you like to progress with chunking? (y/n):').lower()
while qa not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Would you like to progress with chunking? (y/n):').lower()
if qa in ['y', 'yes']:
# ---------------------------------------------------------
# Analyse already created tables before chunking
# ---------------------------------------------------------
tbl_list = [target_schema + "." + tbl for tbl in ('care_site', 'death', 'location', 'observation_period', 'person', 'provider', 'visit_detail', 'visit_occurrence')]
for tbl in tbl_list:
query1 = 'VACUUM (ANALYZE) ' + tbl
print('Executing ' + query1)
cursor1.execute(query1)
# ---------------------------------------------------------
# Select not completed chunk ids
# ---------------------------------------------------------
cnx.autocommit = False
chunks_time1 = time.time()
query1 = 'SELECT distinct chunk_id FROM ' + chunk_schema + '.chunk where completed = 0 order by chunk_id'
chunk_limit = db_conf['chunk_limit']
if chunk_limit > 0:
query1 += ' limit ' + str(chunk_limit)
query1 += ';'
cursor1.execute(query1)
chunk_id_array = cursor1.fetchall()
chunk_id_list = list(map(lambda x: x[0], chunk_id_array))
# Temporary disable Autovacuum while chunking
tbl_list = [target_schema + "." + tbl for tbl in db_conf['tbl_cdm']]
for tbl in tbl_list:
query1 = 'ALTER TABLE ' + tbl + ' SET (autovacuum_enabled = False)'
cursor1.execute(query1)
# ---------------------------------------------------------
# Loop through the chunks executing 4g, 4h and 4i each time before commit
# ---------------------------------------------------------
move_files = False
for chunk_id in chunk_id_list:
print(f'Executing chunk {str(chunk_id)} / {str(chunk_id_list[-1])}')
chunk_time1 = time.time()
if chunk_id == chunk_id_list[-1]:
move_files = True
fname = dir_sql + '4g_' + database_type + '_map_tbl_stem_source.sql'
print('Executing ' + fname + ' ... (STEM_SOURCE)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, str(chunk_id), cnx, False, debug, move_files)
if ret == True:
fname = dir_sql + '4h_' + database_type + '_map_tbl_stem.sql'
print('Executing ' + fname + ' ... (STEM)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, str(chunk_id), cnx, False, debug, move_files)
if ret == True:
fname = dir_sql + '4i' + db_conf['cdm_version'][2] + '_' + database_type + '_map_tbl_cdm.sql'
print('Executing ' + fname + ' ... (CONDITION_OCCURRENCE, DEVICE_EXPOSURE, DRUG_EXPOSURE, MEASUREMENT, OBSERVATION, PROCEDURE_OCCURRENCE, SPECIMEN)')
ret = mapping_util.execute_multiple_queries(db_conf, fname, str(chunk_id), cnx, False, debug, move_files)
if ret == True:
cnx.commit()
msg = mapping_util.calc_time(time.time() - chunk_time1)
print(f'Chunk {str(chunk_id)} finished in {msg}')
if ret == False:
break
if ret == True:
# Analyse tables after chunking
cnx.autocommit = True
for tbl in tbl_list:
query1 = 'VACUUM (ANALYZE) ' + tbl
print('Executing ' + query1)
cursor1.execute(query1)
# Re-enable Autovacuum after chunking
for tbl in tbl_list:
query1 = 'ALTER TABLE ' + tbl + ' SET (autovacuum_enabled = True)'
cursor1.execute(query1)
msg = mapping_util.calc_time(time.time() - chunks_time1)
print(f'Full CHUNK process completed in {msg}')
cnx.close()
# ---------------------------------------------------------
# Report total time
# ---------------------------------------------------------
if ret == True:
process_finished = "{0} completed in {1}".format(os.path.basename(__file__), mapping_util.calc_time(time.time() - time0))
print(process_finished)
# ---------------------------------------------------------
# Move CODE to the processed directory?
# ---------------------------------------------------------
if ret == True:
qa = input('Are you sure you want to MOVE all the mapping CODE in the "processed" folder (y/n):')
while qa.lower() not in ['y', 'n', 'yes', 'no']:
qa = input('I did not understand that. Are you sure you want to MOVE all the mapping CODE in the "processed" folder (y/n):')
if qa.lower() in ['y', 'yes']:
for f in glob.iglob(dir_sql + '4*.sql'):
file_processed = dir_sql_processed + os.path.basename(f)
os.rename(f, file_processed)
print('Finished MOVING code files')
except:
if cnx != None:
cnx.rollback()
print(str(sys.exc_info()[1]))
# ---------------------------------------------------------
# Protect entry point
# ---------------------------------------------------------
if __name__ == "__main__":
main()