Skip to content

Commit

Permalink
Merge pull request #1 from zhixingheyi-tian/numa-improve
Browse files Browse the repository at this point in the history
Numa improve
  • Loading branch information
zhixingheyi-tian authored Jul 12, 2019
2 parents e8e1c69 + 3af6e9a commit d0fbc48
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ object SparkEnv extends Logging {
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
None,
bindAddress,
advertiseAddress,
Option(port),
Expand All @@ -193,13 +194,15 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
numaNodeId,
hostname,
hostname,
None,
Expand All @@ -217,6 +220,7 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
numaNodeId: Option[String],
hostname: String,
cores: Int,
userClassPath: Seq[URL],
Expand Down Expand Up @@ -177,6 +178,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
numaNodeId: Option[String],
hostname: String,
cores: Int,
appId: String,
Expand Down Expand Up @@ -226,10 +228,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
driverConf, executorId, numaNodeId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
env.rpcEnv, driverUrl, executorId, numaNodeId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
Expand All @@ -245,6 +247,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var numaNodeId: Option[String] = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
Expand All @@ -269,6 +272,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--numa-node-id") :: value :: tail =>
numaNodeId = Some(value.trim.toString)
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
Expand All @@ -290,7 +296,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit()
}

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
logInfo(s"[NUMACHECK] numaNodeId $numaNodeId")
run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}

Expand All @@ -303,6 +310,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| Options are:
| --driver-url <driverUrl>
| --executor-id <executorId>
| --numa-node-id <numaNodeId>
| --hostname <hostname>
| --cores <cores>
| --app-id <appid>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
None, "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
Expand All @@ -47,6 +48,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
numaNodeId: Option[String],
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -197,21 +199,37 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath)
}.toSeq

val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)

logInfo(s"[NUMACHECK] numaEnabled $numaEnabled executorId $executorId")
// Don't need numa binding for driver.
val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "<executorId>"
&& numaNodeId.nonEmpty) {
logInfo(s"numaNodeId ${numaNodeId.get}")
val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} "
(command, Seq("--numa-node-id", numaNodeId.get.toString))
} else {
("", Nil)
}

logInfo(s"[NUMACHECK] numactl command $numaCtlCommand")
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
numaNodeOpts ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

logInfo(s"[NUMACHECK] container command $commands")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

// The total number of numa node
private[yarn] val totalNumaNumber = sparkConf.get(SPARK_YARN_NUMA_NUMBER)
// Mapping from host to executor counter
private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], numaUsed: Array[Int])

private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]()

/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
*/
Expand Down Expand Up @@ -495,11 +502,25 @@ private[yarn] class YarnAllocator(
for (container <- containersToUse) {
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
// Setting the numa id that the executor should binding.
// new numaid binding method
val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname,
NumaInfo(new mutable.HashMap[String, Int], new Array[Int](totalNumaNumber)))
val minUsed = numaInfo.numaUsed.min
val newNumaNodeId = numaInfo.numaUsed.indexOf(minUsed)
numaInfo.cotainer2numa.put(container.getId.toString, newNumaNodeId)
numaInfo.numaUsed(newNumaNodeId) += 1

val numaNodeId = newNumaNodeId.toString
logInfo(s"numaNodeId: $numaNodeId on host $executorHostname," +
"container: " + container.getId.toString +
", minUsed: " + minUsed)

val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
s"for executor with ID $executorId with numa ID $numaNodeId")

def updateInternalState(): Unit = synchronized {
runningExecutors.add(executorId)
Expand All @@ -525,6 +546,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
Some(numaNodeId),
executorHostname,
executorMemory,
executorCores,
Expand Down Expand Up @@ -583,6 +605,17 @@ private[yarn] class YarnAllocator(
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus

var numaNodeId = -1
val hostName = hostOpt.getOrElse("nohost")
val numaInfoOp = hostToNumaInfo.get(hostName)
numaInfoOp match {
case Some(numaInfo) =>
numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1)
if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) -= 1
case _ => numaNodeId = -1
}

val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
Expand All @@ -606,7 +639,9 @@ private[yarn] class YarnAllocator(
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
". Diagnostics: " + completedContainer.getDiagnostics +
". numaNodeId: " + numaNodeId +
". hostName: " + hostName)

}
if (exitCausedByApp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ package object config {

/* Launcher configuration. */

private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled")
.doc("Whether enabling numa binding when executor start up. This is recommend to true " +
"when persistent memory is enabled.")
.booleanConf
.createWithDefault(false)

private[spark] val SPARK_YARN_NUMA_NUMBER = ConfigBuilder("spark.yarn.numa.number")
.doc("Total number of numanodes in physical server")
.intConf
.createWithDefault(2)

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")
Expand Down

0 comments on commit d0fbc48

Please sign in to comment.