Skip to content

Commit 1366e58

Browse files
dmsuehirashahba
authored andcommitted
Updates to only use docker --privileged when required and check cpuset (intel#150)
* Update numactl usage * Update error handling * Check to make sure it's in cpuset * Update forming cpu list * Style fixes and add unit tests * Test updates * Updates to figure out which cores are on which node * Update to print debug message * Update to organize the cpuset list by node: * Check args to see if docker should run with privileged * Add unit tests * Update numa_cores_per_instance for 'socket' setting * update for cores per instance 'socket' * Add doc update * Remove unused import * Style fixes * update to cpuset list * make str * limit length * update cores_per_node * Move num_physical_cores calculation * update num inter/intra threads * Fix intra threads * Updates to platform util to explain the core lists * Update unit tests * Unit test updates * Add tests with limited cpusets * Remove debug print * Add additional error handling and info * Update base benchmark util * Fix conditionals * Update messages * Update for numa_cores_per_instance 'socket' when sockets have different number of cores * Fix conditionals for checking numa cores per instance socket * Add another unit test to check the case when the --socket-id specified does not have any cores in the cpuset * Removing conditional in the validate function since it's being done in the init function * Add stderr=PIPE so that the terminal doesn't show an error when PlatformUtils is used before numactl is installed
1 parent 22be655 commit 1366e58

38 files changed

+854
-213
lines changed

benchmarks/common/base_benchmark_util.py

+35-16
Original file line numberDiff line numberDiff line change
@@ -315,21 +315,34 @@ def _validate_args(self):
315315
if args.mpi:
316316
raise ValueError("--mpi_num_processes cannot be used together with --numa-cores-per-instance.")
317317

318-
if args.numa_cores_per_instance == "socket":
319-
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket
320-
321-
if args.socket_id != -1:
322-
if int(args.numa_cores_per_instance) > self._platform_util.num_cores_per_socket:
323-
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
324-
"number of cores per socket {} when a single socket (--socket-id {}) "
325-
"is being used.".format(args.numa_cores_per_instance,
326-
self._platform_util.num_cores_per_socket,
327-
args.socket_id))
328-
else:
329-
if int(args.numa_cores_per_instance) > system_num_cores:
330-
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
331-
"number of system cores ({}).".format(args.numa_cores_per_instance,
332-
system_num_cores))
318+
if args.numa_cores_per_instance != "socket":
319+
if args.socket_id != -1:
320+
if int(args.numa_cores_per_instance) > self._platform_util.num_cores_per_socket:
321+
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
322+
"number of cores per socket {} when a single socket (--socket-id {}) "
323+
"is being used.".format(args.numa_cores_per_instance,
324+
self._platform_util.num_cores_per_socket,
325+
args.socket_id))
326+
else:
327+
if int(args.numa_cores_per_instance) > system_num_cores:
328+
raise ValueError("The number of --numa-cores-per-instance ({}) cannot exceed the "
329+
"number of system cores ({}).".format(args.numa_cores_per_instance,
330+
system_num_cores))
331+
332+
# If socket id is specified and we have a cpuset, make sure that there are some cores in the specified socket.
333+
# If cores are limited, then print out a note about that.
334+
if args.socket_id != -1 and self._platform_util.cpuset_cpus:
335+
cpuset_len_for_socket = 0
336+
337+
if args.socket_id in self._platform_util.cpuset_cpus.keys():
338+
cpuset_len_for_socket = len(self._platform_util.cpuset_cpus[args.socket_id])
339+
340+
if cpuset_len_for_socket == 0:
341+
sys.exit("ERROR: There are no socket id {} cores in the cpuset.".format(args.socket_id))
342+
elif cpuset_len_for_socket < self._platform_util.num_cores_per_socket:
343+
print("Note: Socket id {} is specified, but the cpuset has limited this socket to {} cores. "
344+
"This is less than the number of cores per socket on the system ({})".
345+
format(args.socket_id, cpuset_len_for_socket, self._platform_util.num_cores_per_socket))
333346

334347
def initialize_model(self, args, unknown_args):
335348
"""Create model initializer for the specified model"""
@@ -340,7 +353,13 @@ def initialize_model(self, args, unknown_args):
340353
os.path.dirname(os.path.realpath(__file__)))
341354

342355
if args.numa_cores_per_instance == "socket":
343-
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket
356+
if self._platform_util.cpuset_cpus:
357+
if args.socket_id != -1:
358+
args.numa_cores_per_instance = len(self._platform_util.cpuset_cpus[args.socket_id])
359+
else:
360+
args.numa_cores_per_instance = "socket"
361+
else:
362+
args.numa_cores_per_instance = self._platform_util.num_cores_per_socket
344363

345364
# find the path to the model_init.py file
346365
filename = "{}.py".format(self.MODEL_INITIALIZER)

benchmarks/common/base_model_init.py

+62-27
Original file line numberDiff line numberDiff line change
@@ -152,26 +152,38 @@ def run_numactl_multi_instance(self, cmd, replace_unique_output_dir=None):
152152
swap out that path for a path with the instance number in the folder name
153153
so that each instance uses a unique output folder.
154154
"""
155-
# Get the cores list and group them according to the number of cores per instance
156-
cores_per_instance = int(self.args.numa_cores_per_instance)
157-
cpu_cores_list = self.platform_util.cpu_core_list
158-
159-
if self.args.socket_id != -1:
160-
# If it's specified to just use a single socket, then only use the cores from that socket
161-
if len(cpu_cores_list) > self.args.socket_id:
162-
cpu_cores_list = cpu_cores_list[self.args.socket_id]
155+
156+
if self.args.numa_cores_per_instance != "socket":
157+
# Get the cores list and group them according to the number of cores per instance
158+
cores_per_instance = int(self.args.numa_cores_per_instance)
159+
cpu_cores_list = self.platform_util.cpu_core_list
160+
161+
if self.args.socket_id != -1:
162+
# If it's specified to just use a single socket, then only use the cores from that socket
163+
if len(cpu_cores_list) > self.args.socket_id:
164+
cpu_cores_list = cpu_cores_list[self.args.socket_id]
165+
else:
166+
raise ValueError("Error while trying to get the core list for socket {0}. "
167+
"The core list does not have cores for socket {0}.\n "
168+
"Core list: {1}\n".format(self.args.socket_id, str(cpu_cores_list)))
163169
else:
164-
raise ValueError("Error while trying to get the core list for socket {0}. "
165-
"The core list does not have cores for socket {0}.\n "
166-
"Core list: {1}\n".format(self.args.socket_id, str(cpu_cores_list)))
167-
else:
168-
# Using cores from all sockets
169-
combined_core_list = []
170-
for socket_cores in cpu_cores_list:
171-
combined_core_list += socket_cores
172-
cpu_cores_list = combined_core_list
170+
# Using cores from all sockets
171+
combined_core_list = []
172+
for socket_cores in cpu_cores_list:
173+
combined_core_list += socket_cores
174+
cpu_cores_list = combined_core_list
173175

174-
instance_cores_list = self.group_cores(cpu_cores_list, cores_per_instance)
176+
instance_cores_list = self.group_cores(cpu_cores_list, cores_per_instance)
177+
else:
178+
instance_cores_list = []
179+
cores_per_instance = "socket"
180+
# Cores should be grouped based on the cores for each socket
181+
if self.args.socket_id != -1:
182+
# Only using cores from one socket
183+
instance_cores_list[0] = self.platform_util.cpu_core_list[self.args.socket_id]
184+
else:
185+
# Get the cores for each socket
186+
instance_cores_list = self.platform_util.cpu_core_list
175187

176188
# Setup the log file name with the model name, precision, mode, batch size (if there is one),
177189
# number of cores per instance. An extra {} is intentionally left in the log_filename_format
@@ -188,11 +200,14 @@ def run_numactl_multi_instance(self, cmd, replace_unique_output_dir=None):
188200
# Loop through each instance and add that instance's command to a string
189201
multi_instance_command = ""
190202
for instance_num, core_list in enumerate(instance_cores_list):
191-
if len(core_list) < int(cores_per_instance):
203+
if cores_per_instance != "socket" and len(core_list) < int(cores_per_instance):
192204
print("NOTE: Skipping remainder of {} cores for instance {}"
193205
.format(len(core_list), instance_num))
194206
continue
195207

208+
if len(core_list) == 0:
209+
continue
210+
196211
prefix = ("OMP_NUM_THREADS={0} "
197212
"numactl --localalloc --physcpubind={1}").format(
198213
len(core_list), ",".join(core_list))
@@ -340,31 +355,51 @@ def set_num_inter_intra_threads(self, num_inter_threads=None, num_intra_threads=
340355

341356
if self.args.numa_cores_per_instance:
342357
# Set default num inter/intra threads if the user didn't provide specific values
358+
if self.args.numa_cores_per_instance == "socket":
359+
if self.args.socket_id != -1:
360+
inter_threads = len(self.platform_util.cpu_core_list[self.args.socket_id])
361+
else:
362+
# since we can only have one value for inter threads and the number of cores
363+
# per socket can vary, if the cpuset is limited, get the lowest core count
364+
# per socket and use that as the num inter threads
365+
inter_threads = min([len(i) for i in self.platform_util.cpu_core_list if len(i) > 0])
366+
else:
367+
inter_threads = self.args.numa_cores_per_instance
368+
343369
if not self.args.num_inter_threads:
344370
self.args.num_inter_threads = 1
345371
if not self.args.num_intra_threads:
346-
self.args.num_intra_threads = self.args.numa_cores_per_instance
372+
self.args.num_intra_threads = inter_threads
347373
if not self.args.data_num_inter_threads:
348374
self.args.data_num_inter_threads = 1
349375
if not self.args.data_num_intra_threads:
350-
self.args.data_num_intra_threads = self.args.numa_cores_per_instance
376+
self.args.data_num_intra_threads = inter_threads
351377
elif self.args.socket_id != -1:
352378
if not self.args.num_inter_threads:
353379
self.args.num_inter_threads = 1
354380
if not self.args.num_intra_threads:
355-
self.args.num_intra_threads = \
356-
self.platform_util.num_cores_per_socket \
357-
if self.args.num_cores == -1 else self.args.num_cores
381+
if self.args.num_cores != -1:
382+
self.args.num_intra_threads = self.args.num_cores
383+
elif self.platform_util.cpuset_cpus and \
384+
self.args.socket_id in self.platform_util.cpuset_cpus.keys():
385+
self.args.num_intra_threads = len(self.platform_util.cpuset_cpus[self.args.socket_id])
386+
else:
387+
self.args.num_intra_threads = self.platform_util.num_cores_per_socket
358388
else:
359389
if not self.args.num_inter_threads:
360390
self.args.num_inter_threads = self.platform_util.num_cpu_sockets
361391
if os.environ["MPI_NUM_PROCESSES"] != "None":
362392
self.args.num_inter_threads = 1
363393
if not self.args.num_intra_threads:
364394
if self.args.num_cores == -1:
365-
self.args.num_intra_threads = \
366-
int(self.platform_util.num_cores_per_socket *
367-
self.platform_util.num_cpu_sockets)
395+
if self.platform_util.cpuset_cpus and len(self.platform_util.cpuset_cpus.keys()) > 0:
396+
# Total up the number of cores in the cpuset
397+
self.args.num_intra_threads = sum([len(self.platform_util.cpuset_cpus[socket_id])
398+
for socket_id in self.platform_util.cpuset_cpus.keys()])
399+
else:
400+
self.args.num_intra_threads = \
401+
int(self.platform_util.num_cores_per_socket *
402+
self.platform_util.num_cpu_sockets)
368403
if os.environ["MPI_NUM_PROCESSES"] != "None":
369404
self.args.num_intra_threads = self.platform_util.num_cores_per_socket - 2
370405
else:

benchmarks/common/platform_util.py

+105-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
CORES_PER_SOCKET_STR_ = "Core(s) per socket"
3434
THREADS_PER_CORE_STR_ = "Thread(s) per core"
3535
LOGICAL_CPUS_STR_ = "CPU(s)"
36+
NUMA_NODE_CPU_RANGE_STR_ = "NUMA node{} CPU(s):"
37+
ONLINE_CPUS_LIST = "On-line CPU(s) list:"
3638

3739

3840
class CPUInfo():
@@ -192,8 +194,16 @@ def __init__(self, args):
192194
self.num_threads_per_core = 0
193195
self.num_logical_cpus = 0
194196
self.num_numa_nodes = 0
197+
198+
# Core list generated by numactl -H in the case where --numa-cores-per-instance is
199+
# being used. It then gets pruned based on the cpuset_cpus, in case docker is
200+
# limiting the cores that the container has access to
195201
self.cpu_core_list = []
196202

203+
# Dictionary generated from the cpuset.cpus file (in linux_init) for the case where
204+
# docker is limiting the number of cores that the container has access to
205+
self.cpuset_cpus = None
206+
197207
os_type = system_platform.system()
198208
if "Windows" == os_type:
199209
self.windows_init()
@@ -204,6 +214,45 @@ def __init__(self, args):
204214
else:
205215
raise ValueError("Unable to determine Operating system type.")
206216

217+
def _get_list_from_string_ranges(self, str_ranges):
218+
"""
219+
Converts a string of numbered ranges (comma separated numbers or ranges) to an
220+
integer list. Duplicates should be removed and the integer list should be
221+
ordered.
222+
For example an input of "3-6,10,0-5" should return [0, 1, 2, 3, 4, 5, 6, 10]
223+
"""
224+
result_list = []
225+
226+
for section in str_ranges.split(","):
227+
if "-" in section:
228+
# Section is a range, so get the start and end values
229+
start, end = section.split("-")
230+
section_list = range(int(start), int(end) + 1)
231+
result_list += section_list
232+
else:
233+
# This section is just a single number, not a range
234+
result_list.append(int(section))
235+
236+
# Remove duplicates
237+
result_list = list(set(result_list))
238+
239+
return result_list
240+
241+
def _get_cpuset(self):
242+
"""
243+
Try to get the cpuset.cpus info, since lscpu does not know if docker has limited
244+
the cpuset accessible to the container
245+
"""
246+
cpuset = ""
247+
cpuset_cpus_file = "/sys/fs/cgroup/cpuset/cpuset.cpus"
248+
if os.path.exists(cpuset_cpus_file):
249+
with open(cpuset_cpus_file, "r") as f:
250+
cpuset = f.read()
251+
252+
if self.args.verbose:
253+
print("cpuset.cpus: {}".format(cpuset))
254+
return cpuset
255+
207256
def linux_init(self):
208257
lscpu_cmd = "lscpu"
209258
try:
@@ -219,6 +268,9 @@ def linux_init(self):
219268
print("Problem getting CPU info: {}".format(e))
220269
sys.exit(1)
221270

271+
core_list_per_node = {}
272+
online_cpus_list = ""
273+
222274
# parse it
223275
for line in cpu_info:
224276
# NUMA_NODES_STR_ = "NUMA node(s)"
@@ -236,28 +288,76 @@ def linux_init(self):
236288
# LOGICAL_CPUS_STR_ = "CPU(s)"
237289
elif line.find(LOGICAL_CPUS_STR_) == 0:
238290
self.num_logical_cpus = int(line.split(":")[1].strip())
291+
# ONLINE_CPUS_LIST = "On-line CPU(s) list"
292+
elif line.find(ONLINE_CPUS_LIST) == 0:
293+
online_cpus_list = line.split(":")[1].strip()
294+
else:
295+
# Get the ranges of cores per node from NUMA node* CPU(s)
296+
for node in range(0, self.num_numa_nodes):
297+
if line.find(NUMA_NODE_CPU_RANGE_STR_.format(str(node))) == 0:
298+
range_for_node = line.split(":")[1].strip()
299+
range_list_for_node = self._get_list_from_string_ranges(range_for_node)
300+
core_list_per_node[node] = range_list_for_node
301+
302+
# Try to get the cpuset.cpus info, since lscpu does not know if the cpuset is limited
303+
cpuset = self._get_cpuset()
304+
if cpuset:
305+
# If the cpuset is the same as the online_cpus_list, then we are using the whole
306+
# machine, so let's avoid unnecessary complexity and don't bother with the cpuset_cpu list
307+
if (online_cpus_list != "" and online_cpus_list != cpuset) or online_cpus_list == "":
308+
self.cpuset_cpus = self._get_list_from_string_ranges(cpuset)
239309

240310
# Uses numactl get the core number for each numa node and adds the cores for each
241-
# node to the cpu_cores_list array
242-
if self.num_numa_nodes > 0:
311+
# node to the cpu_cores_list array. Only do this if the command is trying to use
312+
# numa_cores_per_instance we can't count on numactl being installed otherwise and
313+
# this list is only used for the numactl multi-instance runs.
314+
num_physical_cores = self.num_cpu_sockets * self.num_cores_per_socket
315+
cores_per_node = int(num_physical_cores / self.num_numa_nodes)
316+
if self.num_numa_nodes > 0 and self.args.numa_cores_per_instance is not None:
243317
try:
244318
# Get the list of cores
245-
num_physical_cores = self.num_cpu_sockets * self.num_cores_per_socket
246-
cores_per_node = int(num_physical_cores / self.num_numa_nodes)
247319
cpu_array_command = \
248320
"numactl -H | grep 'node [0-9]* cpus:' |" \
249321
"sed 's/.*node [0-9]* cpus: *//' | head -{0} |cut -f1-{1} -d' '".format(
250322
self.num_numa_nodes, int(cores_per_node))
251323
cpu_array = subprocess.Popen(
252-
cpu_array_command, shell=True, stdout=subprocess.PIPE).stdout.readlines()
324+
cpu_array_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.readlines()
253325

254326
for node_cpus in cpu_array:
255327
node_cpus = str(node_cpus).lstrip("b'").replace("\\n'", " ")
256328
self.cpu_core_list.append([x for x in node_cpus.split(" ") if x != ''])
329+
330+
# If we have the cpuset list, cross check that list with our core list and
331+
# remove cores that are not part of the cpuset list
332+
if self.cpuset_cpus is not None:
333+
for socket, core_list in enumerate(self.cpu_core_list):
334+
self.cpu_core_list[socket] = [x for x in core_list if int(x) in self.cpuset_cpus]
335+
336+
if (self.args.verbose):
337+
print("Core list: {}".format(self.cpu_core_list), flush=True)
338+
257339
except Exception as e:
258340
print("Warning: An error occured when getting the list of cores using '{}':\n {}".
259341
format(cpu_array_command, e))
260342

343+
if self.cpuset_cpus is not None:
344+
# Reformat the cpuset_cpus list so that it's split up by node
345+
for node in core_list_per_node.keys():
346+
core_list_per_node[node] = [x for x in core_list_per_node[node] if x in self.cpuset_cpus]
347+
self.cpuset_cpus = core_list_per_node
348+
349+
# Remove cores that aren't part of the cpu_core_list
350+
for socket in self.cpuset_cpus.keys():
351+
if len(self.cpuset_cpus[socket]) > cores_per_node:
352+
del self.cpuset_cpus[socket][cores_per_node:]
353+
354+
# Remove keys with empty lists (sockets where there are no cores enabled in the cpuset)
355+
self.cpuset_cpus = {k: v for k, v in self.cpuset_cpus.items() if v}
356+
357+
# Update the number of sockets based on the cpuset
358+
if len(self.cpuset_cpus.keys()) > 0:
359+
self.num_cpu_sockets = len(self.cpuset_cpus.keys())
360+
261361
def windows_init(self):
262362
NUM_SOCKETS_STR_ = "DeviceID"
263363
CORES_PER_SOCKET_STR_ = "NumberOfCores"

0 commit comments

Comments
 (0)