Skip to content

Commit bb8f9ee

Browse files
committed
Added null_string
Added null_string in load_files and load_files_parallel
1 parent d6a3e6c commit bb8f9ee

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

9_load_achilles_dqd.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ def main():
5656
if ret == True:
5757
data_provider = db_conf['data_provider']
5858
prefix = ''
59+
with_quotes = False
60+
null_string = ''
5961
if data_provider == 'cprd':
6062
extension = '.txt'
6163
separator = ' '
@@ -65,6 +67,8 @@ def main():
6567
elif data_provider == 'thin':
6668
extension = '.csv'
6769
separator = ','
70+
with_quotes = True
71+
null_string = 'NA'
6872
elif data_provider == 'ukbiobank':
6973
extension = '.tsv'
7074
separator = ' '
@@ -73,7 +77,7 @@ def main():
7377
print(tbl_list_full)
7478
file_list = [[dir_data + '*' + tbl + '*' + extension] for tbl in tbl_list]
7579
print(file_list)
76-
ret = mapping_util.load_files_parallel(db_conf, result_schema, tbl_list, file_list, dir_data_processed, separator)
80+
ret = mapping_util.load_files_parallel(db_conf, result_schema, tbl_list, file_list, dir_data_processed, separator, with_quotes, null_string)
7781
if ret == True:
7882
print('Finished loading cdm vocabulary.')
7983
# ---------------------------------------------------------

mapping_util.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def does_tbl_exist(cnx, tbl_name):
158158
return(ret, exist)
159159

160160
# ---------------------------------------------------------
161-
def load_files(db_conf, schema, tbl_name, file_list, dir_processed, separator, with_quotes):
161+
def load_files(db_conf, schema, tbl_name, file_list, dir_processed, separator, with_quotes, null_string):
162162
"Load files into tables"
163163
# ---------------------------------------------------------
164164
ret = True
@@ -195,15 +195,17 @@ def load_files(db_conf, schema, tbl_name, file_list, dir_processed, separator, w
195195
stream = StringIO()
196196
if data_provider == 'ukbiobank':
197197
stream.write(open(fname, encoding='cp1252', errors = 'ignore').read().replace('\\', ''))
198+
# elif data_provider == 'thin':
199+
# stream.write(open(fname, errors = 'ignore').read().replace('\\', '').replace(',NA,', ',,'))
198200
else:
199201
stream.write(open(fname, errors = 'ignore').read().replace('\\', ''))
200202
# stream.write(open(fname, errors = 'ignore').read().replace('\\', '').replace('\u0000', ''))
201203
stream.seek(0)
202204
stream.readline() #To avoid headers
203205
if with_quotes == False:
204-
cursor1.copy_from(stream, tbl_name, sep = separator, null = '')
206+
cursor1.copy_from(stream, tbl_name, sep = separator, null = null_string)
205207
else:
206-
cursor1.copy_expert("COPY " + tbl_name + " FROM STDIN WITH (FORMAT CSV, delimiter '" + separator + "', quote '\"')", stream)
208+
cursor1.copy_expert("COPY " + tbl_name + " FROM STDIN WITH (FORMAT CSV, delimiter '" + separator + "', quote '\"', NULL '" + null_string + "')", stream)
207209
# ---------------------------------------------------------
208210
# Move loaded file to PROCESSED directory
209211
# ---------------------------------------------------------
@@ -222,7 +224,7 @@ def load_files(db_conf, schema, tbl_name, file_list, dir_processed, separator, w
222224
return(ret)
223225

224226
# ---------------------------------------------------------
225-
def load_files_parallel(db_conf, schema, tbl_list, file_list, dir_processed, separator = ' ', with_quotes = False):
227+
def load_files_parallel(db_conf, schema, tbl_list, file_list, dir_processed, separator = ' ', with_quotes = False, null_string = ''):
226228
"Load files into tables"
227229
# ---------------------------------------------------------
228230
ret = True
@@ -234,7 +236,7 @@ def load_files_parallel(db_conf, schema, tbl_list, file_list, dir_processed, sep
234236
# Load files in parallel (all tables), sequentially within each table
235237
# ---------------------------------------------------------
236238
with ProcessPoolExecutor(int(db_conf['max_workers'])) as executor:
237-
futures = [executor.submit(load_files, db_conf, schema, tbl_name, file_list[idx], dir_processed, separator, with_quotes) for idx, tbl_name in enumerate(tbl_list)]
239+
futures = [executor.submit(load_files, db_conf, schema, tbl_name, file_list[idx], dir_processed, separator, with_quotes, null_string) for idx, tbl_name in enumerate(tbl_list)]
238240
for future in as_completed(futures):
239241
if future.result() == False:
240242
ret = False

0 commit comments

Comments
 (0)