Skip to content

Commit 3af6e9a

Browse files
improve
1 parent 669f573 commit 3af6e9a

File tree

4 files changed

+41
-16
lines changed

4 files changed

+41
-16
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
296296
printUsageAndExit()
297297
}
298298

299-
logInfo(s"[YUQIANG] numaNodeId $numaNodeId")
299+
logInfo(s"[NUMACHECK] numaNodeId $numaNodeId")
300300
run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath)
301301
System.exit(0)
302302
}

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private[yarn] class ExecutorRunnable(
201201

202202
val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)
203203

204-
logInfo(s"[YUQIANG] numaEnabled $numaEnabled executorId $executorId")
204+
logInfo(s"[NUMACHECK] numaEnabled $numaEnabled executorId $executorId")
205205
// Don't need numa binding for driver.
206206
val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "<executorId>"
207207
&& numaNodeId.nonEmpty) {
@@ -212,7 +212,7 @@ private[yarn] class ExecutorRunnable(
212212
("", Nil)
213213
}
214214

215-
logInfo(s"[YUQIANG] numactl command $numaCtlCommand")
215+
logInfo(s"[NUMACHECK] numactl command $numaCtlCommand")
216216
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
217217
val commands = prefixEnv ++
218218
Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
@@ -229,7 +229,7 @@ private[yarn] class ExecutorRunnable(
229229
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
230230
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
231231

232-
logInfo(s"[YUQIANG] container command $commands")
232+
logInfo(s"[NUMACHECK] container command $commands")
233233
// TODO: it would be nicer to just make sure there are no null commands here
234234
commands.map(s => if (s == null) "null" else s).toList
235235
}

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

+32-12
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,11 @@ private[yarn] class YarnAllocator(
161161
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
162162

163163
// The total number of numa node
164-
private[yarn] val totalNumaNumber = 2
165-
// Mapping from host to executor counter, we use the counter with a round-robin mode to
166-
// determine the numa node id that the executor should bind.
167-
private[yarn] val hostToNuma = new mutable.HashMap[String, Int]()
164+
private[yarn] val totalNumaNumber = sparkConf.get(SPARK_YARN_NUMA_NUMBER)
165+
// Mapping from host to executor counter
166+
private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], numaUsed: Array[Int])
167+
168+
private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]()
168169

169170
/**
170171
* Use a different clock for YarnAllocator. This is mainly used for testing.
@@ -501,13 +502,19 @@ private[yarn] class YarnAllocator(
501502
for (container <- containersToUse) {
502503
executorIdCounter += 1
503504
val executorHostname = container.getNodeId.getHost
504-
// Setting the numa id that the executor should binding. Just round robin from 0 to
505-
// totalNumaNumber for each host.
506-
// TODO: This is very ugly, however this is should be processed in resource
507-
// manager(such as yarn).
508-
val preSize = hostToNuma.getOrElseUpdate(executorHostname, 0)
509-
val numaNodeId = (preSize % totalNumaNumber).toString
510-
hostToNuma.put(executorHostname, preSize + 1)
505+
// Setting the numa id that the executor should binding.
506+
// new numaid binding method
507+
val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname,
508+
NumaInfo(new mutable.HashMap[String, Int], new Array[Int](totalNumaNumber)))
509+
val minUsed = numaInfo.numaUsed.min
510+
val newNumaNodeId = numaInfo.numaUsed.indexOf(minUsed)
511+
numaInfo.cotainer2numa.put(container.getId.toString, newNumaNodeId)
512+
numaInfo.numaUsed(newNumaNodeId) += 1
513+
514+
val numaNodeId = newNumaNodeId.toString
515+
logInfo(s"numaNodeId: $numaNodeId on host $executorHostname," +
516+
"container: " + container.getId.toString +
517+
", minUsed: " + minUsed)
511518

512519
val containerId = container.getId
513520
val executorId = executorIdCounter.toString
@@ -598,6 +605,17 @@ private[yarn] class YarnAllocator(
598605
// there are some exit status' we shouldn't necessarily count against us, but for
599606
// now I think its ok as none of the containers are expected to exit.
600607
val exitStatus = completedContainer.getExitStatus
608+
609+
var numaNodeId = -1
610+
val hostName = hostOpt.getOrElse("nohost")
611+
val numaInfoOp = hostToNumaInfo.get(hostName)
612+
numaInfoOp match {
613+
case Some(numaInfo) =>
614+
numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1)
615+
if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) -= 1
616+
case _ => numaNodeId = -1
617+
}
618+
601619
val (exitCausedByApp, containerExitReason) = exitStatus match {
602620
case ContainerExitStatus.SUCCESS =>
603621
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
@@ -621,7 +639,9 @@ private[yarn] class YarnAllocator(
621639
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
622640
(true, "Container marked as failed: " + containerId + onHostStr +
623641
". Exit status: " + completedContainer.getExitStatus +
624-
". Diagnostics: " + completedContainer.getDiagnostics)
642+
". Diagnostics: " + completedContainer.getDiagnostics +
643+
". numaNodeId: " + numaNodeId +
644+
". hostName: " + hostName)
625645

626646
}
627647
if (exitCausedByApp) {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ package object config {
135135
.booleanConf
136136
.createWithDefault(false)
137137

138+
private[spark] val SPARK_YARN_NUMA_NUMBER = ConfigBuilder("spark.yarn.numa.number")
139+
.doc("Total number of numanodes in physical server")
140+
.intConf
141+
.createWithDefault(2)
142+
138143
private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
139144
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
140145
"launcher process.")

0 commit comments

Comments
 (0)