From 669f573229d17a09910207712627204075f91274 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian <1564481943@qq.com> Date: Fri, 12 Jul 2019 00:44:28 +0800 Subject: [PATCH 1/2] initial numa --- .../scala/org/apache/spark/SparkEnv.scala | 4 ++++ .../CoarseGrainedExecutorBackend.scala | 14 ++++++++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 20 ++++++++++++++++++- .../spark/deploy/yarn/YarnAllocator.scala | 17 +++++++++++++++- .../org/apache/spark/deploy/yarn/config.scala | 6 ++++++ 6 files changed, 57 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f2..be88fe6 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -175,6 +175,7 @@ object SparkEnv extends Logging { create( conf, SparkContext.DRIVER_IDENTIFIER, + None, bindAddress, advertiseAddress, Option(port), @@ -193,6 +194,7 @@ object SparkEnv extends Logging { private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, + numaNodeId: Option[String], hostname: String, numCores: Int, ioEncryptionKey: Option[Array[Byte]], @@ -200,6 +202,7 @@ object SparkEnv extends Logging { val env = create( conf, executorId, + numaNodeId, hostname, hostname, None, @@ -217,6 +220,7 @@ object SparkEnv extends Logging { private def create( conf: SparkConf, executorId: String, + numaNodeId: Option[String], bindAddress: String, advertiseAddress: String, port: Option[Int], diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9b62e4b..66db6c2 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -41,6 +41,7 @@ private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, + numaNodeId: Option[String], hostname: String, cores: Int, userClassPath: Seq[URL], @@ -177,6 +178,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( driverUrl: String, executorId: String, + numaNodeId: Option[String], hostname: String, cores: Int, appId: String, @@ -226,10 +228,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) + driverConf, executorId, numaNodeId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( - env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) + env.rpcEnv, driverUrl, executorId, numaNodeId, hostname, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } @@ -245,6 +247,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null + var numaNodeId: Option[String] = null var hostname: String = null var cores: Int = 0 var appId: String = null @@ -269,6 +272,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--app-id") :: value :: tail => appId = value argv = tail + case ("--numa-node-id") :: value :: tail => + numaNodeId = Some(value.trim.toString) + argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) @@ -290,7 +296,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) + logInfo(s"[YUQIANG] numaNodeId $numaNodeId") + run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) } @@ -303,6 +310,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | Options are: | --driver-url | --executor-id + | --numa-node-id | --hostname | --cores | --app-id diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6e35d23..f2debc0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -455,7 +455,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources) + None, "", executorMemory, executorCores, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3f4d236..6a82988 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils @@ -47,6 +48,7 @@ private[yarn] class ExecutorRunnable( sparkConf: SparkConf, masterAddress: String, executorId: String, + numaNodeId: Option[String], hostname: String, executorMemory: Int, executorCores: Int, @@ -197,9 +199,23 @@ private[yarn] class ExecutorRunnable( Seq("--user-class-path", "file:" + absPath) }.toSeq + val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED) + + logInfo(s"[YUQIANG] numaEnabled $numaEnabled executorId $executorId") + // Don't need numa binding for driver. + val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "" + && numaNodeId.nonEmpty) { + logInfo(s"numaNodeId ${numaNodeId.get}") + val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} " + (command, Seq("--numa-node-id", numaNodeId.get.toString)) + } else { + ("", Nil) + } + + logInfo(s"[YUQIANG] numactl command $numaCtlCommand") YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ - Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ + Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, @@ -207,11 +223,13 @@ private[yarn] class ExecutorRunnable( "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ + numaNodeOpts ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") + logInfo(s"[YUQIANG] container command $commands") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b2d960b..78d0e14 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -160,6 +160,12 @@ private[yarn] class YarnAllocator( private[yarn] val containerPlacementStrategy = new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver) + // The total number of numa node + private[yarn] val totalNumaNumber = 2 + // Mapping from host to executor counter, we use the counter with a round-robin mode to + // determine the numa node id that the executor should bind. + private[yarn] val hostToNuma = new mutable.HashMap[String, Int]() + /** * Use a different clock for YarnAllocator. This is mainly used for testing. */ @@ -495,11 +501,19 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { executorIdCounter += 1 val executorHostname = container.getNodeId.getHost + // Setting the numa id that the executor should binding. Just round robin from 0 to + // totalNumaNumber for each host. + // TODO: This is very ugly, however this is should be processed in resource + // manager(such as yarn). + val preSize = hostToNuma.getOrElseUpdate(executorHostname, 0) + val numaNodeId = (preSize % totalNumaNumber).toString + hostToNuma.put(executorHostname, preSize + 1) + val containerId = container.getId val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + - s"for executor with ID $executorId") + s"for executor with ID $executorId with numa ID $numaNodeId") def updateInternalState(): Unit = synchronized { runningExecutors.add(executorId) @@ -525,6 +539,7 @@ private[yarn] class YarnAllocator( sparkConf, driverUrl, executorId, + Some(numaNodeId), executorHostname, executorMemory, executorCores, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 3ba3ae5..2fe8457 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -129,6 +129,12 @@ package object config { /* Launcher configuration. */ + private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled") + .doc("Whether enabling numa binding when executor start up. This is recommend to true " + + "when persistent memory is enabled.") + .booleanConf + .createWithDefault(false) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") .doc("In cluster mode, whether to wait for the application to finish before exiting the " + "launcher process.") From 3af6e9a689bb341374787f64c916ccb9d1daf953 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian <1564481943@qq.com> Date: Fri, 12 Jul 2019 20:37:51 +0800 Subject: [PATCH 2/2] improve --- .../CoarseGrainedExecutorBackend.scala | 2 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 6 +-- .../spark/deploy/yarn/YarnAllocator.scala | 44 ++++++++++++++----- .../org/apache/spark/deploy/yarn/config.scala | 5 +++ 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 66db6c2..f9bca47 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -296,7 +296,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - logInfo(s"[YUQIANG] numaNodeId $numaNodeId") + logInfo(s"[NUMACHECK] numaNodeId $numaNodeId") run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 6a82988..2893ead 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -201,7 +201,7 @@ private[yarn] class ExecutorRunnable( val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED) - logInfo(s"[YUQIANG] numaEnabled $numaEnabled executorId $executorId") + logInfo(s"[NUMACHECK] numaEnabled $numaEnabled executorId $executorId") // Don't need numa binding for driver. val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "" && numaNodeId.nonEmpty) { @@ -212,7 +212,7 @@ private[yarn] class ExecutorRunnable( ("", Nil) } - logInfo(s"[YUQIANG] numactl command $numaCtlCommand") + logInfo(s"[NUMACHECK] numactl command $numaCtlCommand") YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ @@ -229,7 +229,7 @@ private[yarn] class ExecutorRunnable( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") - logInfo(s"[YUQIANG] container command $commands") + logInfo(s"[NUMACHECK] container command $commands") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 78d0e14..5474bf9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -161,10 +161,11 @@ private[yarn] class YarnAllocator( new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver) // The total number of numa node - private[yarn] val totalNumaNumber = 2 - // Mapping from host to executor counter, we use the counter with a round-robin mode to - // determine the numa node id that the executor should bind. - private[yarn] val hostToNuma = new mutable.HashMap[String, Int]() + private[yarn] val totalNumaNumber = sparkConf.get(SPARK_YARN_NUMA_NUMBER) + // Mapping from host to executor counter + private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], numaUsed: Array[Int]) + + private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]() /** * Use a different clock for YarnAllocator. This is mainly used for testing. @@ -501,13 +502,19 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { executorIdCounter += 1 val executorHostname = container.getNodeId.getHost - // Setting the numa id that the executor should binding. Just round robin from 0 to - // totalNumaNumber for each host. - // TODO: This is very ugly, however this is should be processed in resource - // manager(such as yarn). - val preSize = hostToNuma.getOrElseUpdate(executorHostname, 0) - val numaNodeId = (preSize % totalNumaNumber).toString - hostToNuma.put(executorHostname, preSize + 1) + // Setting the numa id that the executor should binding. + // new numaid binding method + val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname, + NumaInfo(new mutable.HashMap[String, Int], new Array[Int](totalNumaNumber))) + val minUsed = numaInfo.numaUsed.min + val newNumaNodeId = numaInfo.numaUsed.indexOf(minUsed) + numaInfo.cotainer2numa.put(container.getId.toString, newNumaNodeId) + numaInfo.numaUsed(newNumaNodeId) += 1 + + val numaNodeId = newNumaNodeId.toString + logInfo(s"numaNodeId: $numaNodeId on host $executorHostname," + + "container: " + container.getId.toString + + ", minUsed: " + minUsed) val containerId = container.getId val executorId = executorIdCounter.toString @@ -598,6 +605,17 @@ private[yarn] class YarnAllocator( // there are some exit status' we shouldn't necessarily count against us, but for // now I think its ok as none of the containers are expected to exit. val exitStatus = completedContainer.getExitStatus + + var numaNodeId = -1 + val hostName = hostOpt.getOrElse("nohost") + val numaInfoOp = hostToNumaInfo.get(hostName) + numaInfoOp match { + case Some(numaInfo) => + numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1) + if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) -= 1 + case _ => numaNodeId = -1 + } + val (exitCausedByApp, containerExitReason) = exitStatus match { case ContainerExitStatus.SUCCESS => (false, s"Executor for container $containerId exited because of a YARN event (e.g., " + @@ -621,7 +639,9 @@ private[yarn] class YarnAllocator( failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) (true, "Container marked as failed: " + containerId + onHostStr + ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) + ". Diagnostics: " + completedContainer.getDiagnostics + + ". numaNodeId: " + numaNodeId + + ". hostName: " + hostName) } if (exitCausedByApp) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 2fe8457..9ff4e86 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -135,6 +135,11 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val SPARK_YARN_NUMA_NUMBER = ConfigBuilder("spark.yarn.numa.number") + .doc("Total number of numanodes in physical server") + .intConf + .createWithDefault(2) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") .doc("In cluster mode, whether to wait for the application to finish before exiting the " + "launcher process.")