@@ -20,22 +20,26 @@ def get_total_cores():
20
20
thread_per_core = int (awk .communicate ()[0 ])
21
21
return os .cpu_count ()// thread_per_core
22
22
23
- def get_file_size (file_path ):
24
- """Gets the size of the file in bytes."""
25
- size = subprocess .check_output (["wc" , "-c" , file_path ])
26
- size = int (size .decode ("utf-8" ).split ()[0 ])
27
- return size
28
-
29
- def get_core_list (cores_per_process ):
23
+ def get_numa_nodes ():
30
24
#numa_nodes
31
25
lscpu = subprocess .Popen (["lscpu" ], stdout = subprocess .PIPE )
32
26
grep = subprocess .Popen (["grep" , "NUMA node(s):" ], stdin = lscpu .stdout , stdout = subprocess .PIPE )
33
27
awk = subprocess .Popen (["awk" , "{print $3}" ], stdin = grep .stdout , stdout = subprocess .PIPE )
34
28
#Get the output
35
29
numa_nodes = int (awk .communicate ()[0 ])
30
+ return numa_nodes
31
+
32
+ def get_file_size (file_path ):
33
+ """Gets the size of the file in bytes."""
34
+ size = subprocess .check_output (["wc" , "-c" , file_path ])
35
+ size = int (size .decode ("utf-8" ).split ()[0 ])
36
+ return size
36
37
38
+ def get_core_list (cores_per_process ):
39
+
40
+ numa_nodes = get_numa_nodes ()
37
41
core_min_max = []
38
- cores_in_numa = os . cpu_count ()
42
+ cores_in_numa = get_total_cores ()
39
43
for i in range (numa_nodes ):
40
44
lscpu = subprocess .Popen (["lscpu" ], stdout = subprocess .PIPE )
41
45
grep = subprocess .Popen (["grep" , "NUMA node" + str (i ) + " CPU(s):" ], stdin = lscpu .stdout , stdout = subprocess .PIPE )
@@ -51,7 +55,7 @@ def get_core_list(cores_per_process):
51
55
for j in range (cores_in_numa // cores_per_process ):
52
56
core_list = core_list + list (range (core_min_max [i ][0 ] + j * cores_per_process , core_min_max [i ][0 ] + (j + 1 )* cores_per_process ))
53
57
else : # single process case or single socket case
54
- core_list = range (os . cpu_count () // 2 )
58
+ core_list = range (get_total_cores () )
55
59
56
60
return core_list , numa_nodes
57
61
@@ -108,12 +112,7 @@ def update_queue(result):
108
112
109
113
def create_process_list (files , MIN_MEM_PER_PROCESS , MIN_CORES_PER_PROCESS , LOAD_BALANCE_FACTOR ):
110
114
total_cores = get_total_cores ()
111
- #numa_nodes
112
- lscpu = subprocess .Popen (["lscpu" ], stdout = subprocess .PIPE )
113
- grep = subprocess .Popen (["grep" , "NUMA node(s):" ], stdin = lscpu .stdout , stdout = subprocess .PIPE )
114
- awk = subprocess .Popen (["awk" , "{print $3}" ], stdin = grep .stdout , stdout = subprocess .PIPE )
115
- #Get the output
116
- numa_nodes = int (awk .communicate ()[0 ])
115
+ numa_nodes = get_numa_nodes ()
117
116
cores_per_numa = total_cores // numa_nodes
118
117
119
118
#sockets
@@ -135,9 +134,10 @@ def create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_
135
134
print ("Memory max {}, Core max {}, Load Balance max {}" .format (mm , cm , lbm ))
136
135
max_p = min (mm , cm , lbm )
137
136
138
- # number which is 2^x and less than or equal to max
137
+ # largest number which is 2^x and less than or equal to max
139
138
max_p = 2 ** int (math .log2 (max_p ))
140
139
140
+ # TODO: Have linear backoff for max_p when number of files are less
141
141
max_processes_list = []
142
142
while max_p > 0 :
143
143
max_processes_list .append (max_p * numa_nodes )
@@ -155,6 +155,7 @@ def start_process_list(files, max_processes_list, bash_subprocess):
155
155
for max_processes in max_processes_list :
156
156
os .environ ["OMP_NUM_THREADS" ] = str (total_cores // max_processes )
157
157
print ("Number of OMP Threads = {}, for {} instances" .format (os .environ .get ('OMP_NUM_THREADS' ), max_processes ))
158
+ # TODO: Have linear backoff condition when number of files are less
158
159
if len (files ) >= max_processes :
159
160
returned_files = multiprocessing_run (files , max_processes , bash_subprocess )
160
161
print ("Following protein files couldn't be processed with {} instances" .format (max_processes ))
0 commit comments