Skip to content

Commit 38d6e2b

Browse files
authored
Merge pull request #24 from oxford-pharmacoepi/anto_server2
Anto server2
2 parents bc41c40 + 0d112d0 commit 38d6e2b

19 files changed

+503
-81
lines changed

3_load_cdm_vocabulary.py

+41-12
Original file line numberDiff line numberDiff line change
@@ -281,17 +281,30 @@ def main():
281281
dir_stcm_old = db_conf['dir_stcm'] + "\\old\\"
282282
if not os.path.exists(dir_stcm_old):
283283
os.makedirs(dir_stcm_old)
284-
285284
# ---------------------------------------------------------
286-
# Drop vocabularies tables - Parallel execution of queries in the file - Ask the user for DROP confirmation
285+
# If database does not exist, create database
286+
# ---------------------------------------------------------
287+
(ret, exist) = mapping_util.does_db_exist(db_conf)
288+
if exist == False:
289+
ret = mapping_util.create_db(db_conf)
290+
if ret == True:
287291
# ---------------------------------------------------------
288-
qa = input('Are you sure you want to DROP all the CDM vocabulary tables (y/n):')
289-
while qa.lower() not in ['y', 'n', 'yes', 'no']:
290-
qa = input('I did not understand that. Are you sure you want to DROP all the CDM vocabulary tables (y/n):')
291-
if qa.lower() in ['y', 'yes']:
292-
fname = dir_sql + '3a_cdm_drop_vocabulary.sql'
292+
# Create the schemas
293+
# ---------------------------------------------------------
294+
fname = dir_sql + '4__schema_create.sql'
293295
print('Calling ' + fname + ' ...')
294296
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
297+
if ret == True:
298+
# ---------------------------------------------------------
299+
# Drop vocabularies tables - Parallel execution of queries in the file - Ask the user for DROP confirmation
300+
# ---------------------------------------------------------
301+
qa = input('Are you sure you want to DROP all the CDM vocabulary tables (y/n):')
302+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
303+
qa = input('I did not understand that. Are you sure you want to DROP all the CDM vocabulary tables (y/n):')
304+
if qa.lower() in ['y', 'yes']:
305+
fname = dir_sql + '3a_cdm_drop_vocabulary.sql'
306+
print('Calling ' + fname + ' ...')
307+
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
295308
# ---------------------------------------------------------
296309
# Create vocabularies tables - Parallel execution of queries in the file - Ask the user for CREATE/LOAD confirmation
297310
# ---------------------------------------------------------
@@ -307,20 +320,36 @@ def main():
307320
# Load vocabularies tables - Parallel execution
308321
# ---------------------------------------------------------
309322
if ret == True:
323+
data_provider = db_conf['data_provider']
324+
prefix = ''
325+
if data_provider == 'cprd':
326+
extension = '.txt'
327+
separator = ' '
328+
elif data_provider == 'iqvia':
329+
extension = '.csv' # + sorted(glob.iglob(folder + '\\*.out'))
330+
separator = ' '
331+
elif data_provider == 'thin':
332+
extension = '.csv'
333+
separator = ';'
334+
prefix = 'OMOP_'
335+
elif data_provider == 'ukbiobank':
336+
extension = '.tsv'
337+
separator = ' '
310338
tbl_cdm_voc = [tbl for tbl in db_conf['tbl_cdm_voc']]
311-
file_list = [[dir_voc + tbl + '.csv'] for tbl in tbl_cdm_voc]
312-
ret = mapping_util.load_files_parallel(db_conf, vocabulary_schema, tbl_cdm_voc, file_list, dir_voc_processed)
339+
file_list = [[dir_voc + prefix + tbl + extension] for tbl in tbl_cdm_voc]
340+
with_quotes = True
341+
ret = mapping_util.load_files_parallel(db_conf, vocabulary_schema, tbl_cdm_voc, file_list, dir_voc_processed, separator, with_quotes)
313342
if ret == True:
314343
print('Finished loading cdm vocabulary.')
315344
# ---------------------------------------------------------
316-
# Create vocabularies PK, indexes - Parallel execution
345+
# Create vocabularies PK, indexes, FKs - Parallel execution
317346
# ---------------------------------------------------------
318347
if ret == True:
319-
qa = input('Are you sure you want to CREATE PK/IDXs for all the vocabulary tables (y/n):')
348+
qa = input('Are you sure you want to CREATE PK/IDXs/FKs for all the vocabulary tables (y/n):')
320349
while qa.lower() not in ['y', 'n', 'yes', 'no']:
321350
qa = input('I did not understand that. Are you sure you want to CREATE PK/IDXs for all the vocabulary tables (y/n):')
322351
if qa.lower() in ['y', 'yes']:
323-
print('Build PKs and IDXs ...')
352+
print('Build PKs, IDXs and FKs ...')
324353
sql_file_list = sorted(glob.iglob(dir_sql + '3c_cdm_pk_idx_*.sql'))
325354
ret = mapping_util.execute_sql_files_parallel(db_conf, sql_file_list, True)
326355
# ---------------------------------------------------------

4_load_mapped_data.py

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import os
2+
import sys
3+
import glob
4+
import time
5+
from datetime import datetime
6+
#import psycopg3 as sql
7+
from io import StringIO
8+
from pathlib import Path
9+
from random import shuffle
10+
from concurrent.futures import ProcessPoolExecutor
11+
from concurrent.futures import as_completed
12+
from importlib.machinery import SourceFileLoader
13+
from importlib import import_module
14+
15+
mapping_util = SourceFileLoader('mapping_util', os.path.dirname(os.path.realpath(__file__)) + '/mapping_util.py').load_module()
16+
17+
# ---------------------------------------------------------
18+
# MAIN PROGRAM
19+
# ---------------------------------------------------------
20+
def main():
21+
ret = True
22+
global db_conf
23+
24+
try:
25+
(ret, dir_study, db_conf, debug) = mapping_util.get_parameters()
26+
if ret == True and dir_study != '':
27+
target_schema = db_conf['target_schema']
28+
vocabulary_schema = db_conf['vocabulary_schema']
29+
dir_sql = os.getcwd() + '\\sql_scripts\\'
30+
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
31+
if not os.path.exists(dir_sql_processed):
32+
os.makedirs(dir_sql_processed)
33+
dir_cdm = db_conf['dir_cdm_data'] + '/'
34+
dir_cdm_processed = db_conf['dir_cdm_data'] + db_conf['dir_processed']
35+
if not os.path.exists(dir_cdm_processed):
36+
os.makedirs(dir_cdm_processed)
37+
# ---------------------------------------------------------
38+
# If database does not exist, create database
39+
# ---------------------------------------------------------
40+
(ret, exist) = mapping_util.does_db_exist(db_conf)
41+
if exist == False:
42+
ret = mapping_util.create_db(db_conf)
43+
if ret == True:
44+
# ---------------------------------------------------------
45+
# Create the schemas
46+
# ---------------------------------------------------------
47+
fname = dir_sql + '4__schema_create.sql'
48+
print('Calling ' + fname + ' ...')
49+
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
50+
if ret == True:
51+
# ---------------------------------------------------------
52+
# Create/Recreate CDM tables? Parallel execution of queries in the file - Ask the user for DROP confirmation
53+
# ---------------------------------------------------------
54+
qa = input('Are you sure you want to DROP/CREATE all the CDM tables (y/n):')
55+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
56+
qa = input('I did not understand that. Are you sure you want to DROP/CREATE all the CDM tables (y/n):')
57+
if qa.lower() in ['y', 'yes']:
58+
fname = dir_sql + '4a_cdm_drop_tbl.sql'
59+
print('Calling ' + fname + ' ...')
60+
ret = mapping_util.execute_multiple_queries(db_conf, fname, None, None, True, debug)
61+
if ret == True:
62+
cdm_version = db_conf['cdm_version']
63+
if cdm_version == '5.3':
64+
fname = dir_sql + '4b_OMOPCDM_postgresql_5_3_ddl.sql'
65+
elif cdm_version == '5.4':
66+
fname = dir_sql + '4b_OMOPCDM_postgresql_5_4_ddl.sql'
67+
print('Calling ' + fname + ' ...')
68+
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False, True)
69+
# ---------------------------------------------------------
70+
# Load CDM tables - Parallel execution
71+
# ---------------------------------------------------------
72+
if ret == True:
73+
qa = input('Are you sure you want to LOAD all the CDM tables (y/n):')
74+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
75+
qa = input('I did not understand that. Are you sure you want to LOAD all the CDM tables (y/n):')
76+
if qa.lower() in ['y', 'yes']:
77+
data_provider = db_conf['data_provider']
78+
prefix = ''
79+
if data_provider == 'cprd':
80+
extension = '.txt'
81+
separator = ' '
82+
elif data_provider == 'iqvia':
83+
extension = '.csv' # + sorted(glob.iglob(folder + '\\*.out'))
84+
separator = ' '
85+
elif data_provider == 'thin':
86+
extension = '.csv'
87+
separator = ';'
88+
prefix = 'OMOP_'
89+
elif data_provider == 'ukbiobank':
90+
extension = '.tsv'
91+
separator = ' '
92+
if ret == True:
93+
tbl_cdm = [tbl for tbl in db_conf['tbl_cdm']]
94+
file_list = [[dir_cdm + prefix + tbl + '*' + extension] for tbl in tbl_cdm]
95+
ret = mapping_util.load_files_parallel(db_conf, target_schema, tbl_cdm, file_list, dir_cdm_processed, separator)
96+
if ret == True:
97+
print('Finished loading MAPPED data.')
98+
except:
99+
print(str(sys.exc_info()[1]))
100+
101+
# ---------------------------------------------------------
102+
# Protect entry point
103+
# ---------------------------------------------------------
104+
if __name__ == "__main__":
105+
main()

9_load_achilles_dqd.py

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import os
2+
import sys
3+
import time
4+
import glob
5+
from datetime import datetime
6+
import psycopg2 as sql
7+
from importlib.machinery import SourceFileLoader
8+
9+
mapping_util = SourceFileLoader('mapping_util', os.path.dirname(os.path.realpath(__file__)) + '/mapping_util.py').load_module()
10+
11+
# ---------------------------------------------------------
12+
# MAIN PROGRAM
13+
# ---------------------------------------------------------
14+
def main():
15+
ret = True
16+
global db_conf
17+
18+
try:
19+
(ret, dir_study, db_conf, debug) = mapping_util.get_parameters()
20+
if ret == True and dir_study != '':
21+
database_name = db_conf['database']
22+
result_schema = db_conf['result_schema']
23+
dir_sql = os.getcwd() + '\\sql_scripts\\'
24+
dir_sql_processed = os.getcwd() + '\\sql_scripts' + db_conf['dir_processed']
25+
dir_data = db_conf['dir_cdm_data'] + '/'
26+
dir_data_processed = db_conf['dir_cdm_data'] + db_conf['dir_processed']
27+
if not os.path.exists(dir_data_processed):
28+
os.makedirs(dir_data_processed)
29+
# ---------------------------------------------------------
30+
# Ask the user for DROP confirmation
31+
# ---------------------------------------------------------
32+
qa = input('Are you sure you want to DROP ' + database_name + ' Achilles and DQD tables (y/n):')
33+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
34+
qa = input('I did not understand that. Are you sure you want to DROP ' + database_name + ' Achilles and DQD tables (y/n):')
35+
if qa.lower() in ['y', 'yes']:
36+
fname = dir_sql + '9a_cdm_post_drop.sql'
37+
print('Calling ' + fname + ' ...')
38+
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
39+
if ret == True:
40+
# ---------------------------------------------------------
41+
# Ask the user for LOAD confirmation
42+
# ---------------------------------------------------------
43+
qa = input('Are you sure you want to CREATE/LOAD ' + database_name + ' Achilles and DQD tables (y/n):')
44+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
45+
qa = input('I did not understand that. Are you sure you want to CREATE/LOAD ' + database_name + ' Achilles and DQD tables (y/n):')
46+
if qa.lower() in ['y', 'yes']:
47+
# ---------------------------------------------------------
48+
# Create tables
49+
# ---------------------------------------------------------
50+
fname = dir_sql + '9b_cdm_post_create.sql'
51+
print('Calling ' + fname + ' ...')
52+
ret = mapping_util.execute_sql_file_parallel(db_conf, fname, False)
53+
# ---------------------------------------------------------
54+
# Load data - Parallel execution
55+
# ---------------------------------------------------------
56+
if ret == True:
57+
data_provider = db_conf['data_provider']
58+
prefix = ''
59+
if data_provider == 'cprd':
60+
extension = '.txt'
61+
separator = ' '
62+
elif data_provider == 'iqvia':
63+
extension = '.csv' # + sorted(glob.iglob(folder + '\\*.out'))
64+
separator = ' '
65+
elif data_provider == 'thin':
66+
extension = '.csv'
67+
separator = ','
68+
elif data_provider == 'ukbiobank':
69+
extension = '.tsv'
70+
separator = ' '
71+
tbl_list = db_conf['tbl_cdm_post']
72+
tbl_list_full = [result_schema + "." + tbl for tbl in tbl_list]
73+
print(tbl_list_full)
74+
file_list = [[dir_data + '*' + tbl + '*' + extension] for tbl in tbl_list]
75+
print(file_list)
76+
ret = mapping_util.load_files_parallel(db_conf, result_schema, tbl_list, file_list, dir_data_processed, separator)
77+
if ret == True:
78+
print('Finished loading cdm vocabulary.')
79+
# ---------------------------------------------------------
80+
# Ask the user for PK/IDX creation confirmation
81+
# ---------------------------------------------------------
82+
if ret == True:
83+
qa = input('Are you sure you want to CREATE PK/IDXs on the ' + database_name + ' Achilles and DQD tables (y/n):')
84+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
85+
qa = input('Are you sure you want to CREATE PK/IDXs on all the ' + database_name + ' Achilles and DQD tables (y/n):')
86+
if qa.lower() in ['y', 'yes']:
87+
# ---------------------------------------------------------
88+
# Build PKs & IDXs
89+
# ---------------------------------------------------------
90+
print('Build PKs and IDXs ...')
91+
sql_file_list = sorted(glob.iglob(dir_sql + '9c_cdm_post_pk_idx_*.sql'))
92+
ret = mapping_util.execute_sql_files_parallel(db_conf, sql_file_list, True)
93+
if ret == True:
94+
print('Finished adding ' + database_name + ' Achilles/DQD PKs/indexes')
95+
# ---------------------------------------------------------
96+
# Ask the user for RECORD COUNTS confirmation
97+
# ---------------------------------------------------------
98+
if ret == True:
99+
qa = input('Are you sure you want to COUNT the records for all the ' + database_name + ' tables (y/n):')
100+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
101+
qa = input('Are you sure you want to COUNT the records for all the ' + database_name + ' tables (y/n):')
102+
if qa.lower() in ['y', 'yes']:
103+
source_nok_schema = db_conf['source_nok_schema']
104+
tbl_list_count = tbl_db_list + [source_nok_schema + "." + tbl for tbl in db_conf[tbl_db] if tbl not in ('practice', 'staff')]
105+
ret = mapping_util.get_table_count_parallel(db_conf, tbl_list_count, source_schema + '._records')
106+
if ret == True:
107+
print('Finished counting on ' + database_name + ' data\n')
108+
# ---------------------------------------------------------
109+
# Move CODE to the processed directory?
110+
# ---------------------------------------------------------
111+
if ret == True:
112+
qa = input('Are you sure you want to MOVE all the source CODE in the "processed" folder (y/n):')
113+
while qa.lower() not in ['y', 'n', 'yes', 'no']:
114+
qa = input('I did not understand that. Are you sure you want to MOVE all the source CODE in the "processed" folder (y/n):')
115+
if qa.lower() in ['y', 'yes']:
116+
for f in glob.iglob(dir_sql + '1*.sql'):
117+
file_processed = dir_sql_processed + os.path.basename(f)
118+
os.rename(f, file_processed)
119+
print('Finished MOVING code files')
120+
except:
121+
print(str(sys.exc_info()[1]))
122+
123+
# ---------------------------------------------------------
124+
# Protect entry point
125+
# ---------------------------------------------------------
126+
if __name__ == "__main__":
127+
main()

0 commit comments

Comments
 (0)