实际是启动一个SparkSubmit的JVM进程 这里,我们主要想了解Yarn的cluster模式 用于向RourceManager提交应用。 本地化级别:进程本地化,节点本地化,机架本地化,任意 可以知道,在构建终端的时候,会给自己发送一个OnStart 通过上面的源码走下来,可能会有点晕,于是贴心的我附上了图形化 好了,到这里,整个向Yarn提交应用的流程已经结束了。文章目录
Spark内核解析(一) Spark向Yarn提交应用(源码解析)
执行脚本提交任务
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn // 默认client --deploy-mode cluster ./examples/jars/spark-examples_2.12-2.4.5.jar 10
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
exec ${JAVA_HOME}/bin/java org.apache.spark.deploy.SparkSubmit
override def main(args: Array[String]): Unit = { val submit = new SparkSubmit() { self => override def doSubmit(args: Array[String]): Unit = { try { super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) } } } submit.doSubmit(args) }
执行提交操作
def doSubmit(args: Array[String]): Unit = { val appArgs = parseArguments(args) appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }
解析参数
protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) }
var master: String = null var deployMode: String = null var mainClass: String = null var action: SparkSubmitAction = null // 解析一系列spark-submit命令行的选项 parse(args.asJava)
// SparkSubmitArguments.scala override protected def handle(opt: String, value: String): Boolean = { opt match { case MASTER => master = value case CLASS => mainClass = value case DEPLOY_MODE => if (value != "client" && value != "cluster") { error("--deploy-mode must be either "client" or "cluster"") } deployMode = value }
--master yarn => master --deploy-mode cluster => deployMode --class SparkPI(WordCount) => mainClass
提交
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { if (args.proxyUser != null) { } else { runMain(args, uninitLog) } } if (args.isStandaloneCluster && args.useRest) { } else { doRunMain() } }
使用提交的参数,运行child class的main方法
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) Thread.currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null mainClass = Utils.classForName(childMainClass) val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } app.start(childArgs.toArray, sparkConf) }
准备提交环境
cluster: childMainClass = org.apache.spark.deploy.yarn.YarnClusterApplication client: childMainClass = mainClass
Thread.currentThread.setContextClassLoader(loader)
通过类名加载这个类
mainClass = Utils.classForName(childMainClass)
反射创建类的对象并进行类型转换
val app: SparkApplication = mainClass.newInstance().asInstanceOf[SparkApplication]
运行childMainClass的start方法
app.start(childArgs.toArray, sparkConf)
运行YarnClusterApplication
override def start(args: Array[String], conf: SparkConf): Unit = { new Client(new ClientArguments(args), conf).run() }
封装参数
创建客户端对象
yarnClient = YarnClient.createYarnClient public static YarnClient createYarnClient() { YarnClient client = new YarnClientImpl(); return client; }
ApplicationClientProtocol rmClient
运行 – 提交应用
def run(): Unit = { this.appId = submitApplication() }
def submitApplication(): ApplicationId = { var appId: ApplicationId = null launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() // Set up the appropriate contexts to launch our AM // 设置合适的上下文环境来启动我们的AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId }
配置JVM的启动参数
val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources.asJava) amContainer.setEnvironment(launchEnv.asJava) amContainer.setCommands(printableCommands.asJava)
val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } cluster: command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster client: command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
向Yarn提交应用
运行ApplicationMaster
def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) master = new ApplicationMaster(amArgs) System.exit(master.run()) }
final def run(): Int = { doAsUser { runImpl() } exitCode }
if (isClusterMode) { runDriver() } else { runExecutorLauncher() }
private def runDriver(): Unit = { userClassThread = startUserApplication() try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { } finally { resumeDriver() } }
启动用户的应用
private def startUserApplication(): Thread = { val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) val userThread = new Thread { override def run() { try { if (!Modifier.isStatic(mainMethod.getModifiers)) { logError(s"Could not find static main method in object ${args.userClass}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) } else { mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running user class") } } catch { } finally { sparkContextPromise.trySuccess(null) } } } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread }
val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]])
userThread = new Thread userThread.setName("Driver") userThread.start()
mainMethod.invoke
线程阻塞,等待对象(SparkContext)的返回
val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS))
注册AM
val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl))
private def registerAM( host: String, port: Int, _sparkConf: SparkConf, uiAddress: Option[String]): Unit = { // client = doAsUser { new YarnRMClient() } client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress) registered = true }
def register( driverHost: String, driverPort: Int, conf: YarnConfiguration, sparkConf: SparkConf, uiAddress: Option[String], uiHistoryAddress: String): Unit = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) amClient.start() synchronized { amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl) registered = true } }
RPC通信,AM向RM申请资源
rpcEnv = sc.env.rpcEnv val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf)
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { val appId = client.getAttemptId().getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString allocator = client.createAllocator( yarnConf, _sparkConf, driverUrl, driverRef, securityMgr, localResources) rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() }
def createAllocator( conf: YarnConfiguration, sparkConf: SparkConf, driverUrl: String, driverRef: RpcEndpointRef, securityMgr: SecurityManager, localResources: Map[String, LocalResource]): YarnAllocator = { require(registered, "Must register AM before creating allocator.") new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, localResources, new SparkRackResolver()) }
def allocateResources(): Unit = synchronized { val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes) if (allocatedContainers.size > 0) { handleAllocatedContainers(allocatedContainers.asScala) } }
获取可用的资源列表
val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers()
处理可用的资源
handleAllocatedContainers(allocatedContainers.asScala)
matchContainerToRequest runAllocatedContainers(containersToUse)
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { if (runningExecutors.size() < targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() updateInternalState() } catch { } } }) } else { // For test only updateInternalState() } } else { } } }
var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _
def run(): Unit = { logDebug("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(conf) nmClient.start() startContainer() }
def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val commands = prepareCommand() ctx.setCommands(commands.asJava) // Send the start request to the ContainerManager try { nmClient.startContainer(container.get, ctx) } catch { } }
command = bin/java org.apache.spark.executor.CoarseGrainedExecutorBackend
nmClient.startContainer(container.get, ctx)
CoarseGrainedExecutorBackend
def main(args: Array[String]) { run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) }
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { SparkHadoopUtil.get.runAsSparkUser { () => val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) env.rpcEnv.awaitTermination() } }
NettyRpcEnv.scala override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) }
Dispatcher.scala def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }
private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) }
// OnStart should be the first message to process inbox.synchronized { messages.add(OnStart) }
/** * Process stored messages. */ def process(dispatcher: Dispatcher): Unit = { while (true) { safelyCall(endpoint) { message match { case RpcMessage(_sender, content, context) => endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) case OnStart => endpoint.onStart() if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { inbox.synchronized { if (!stopped) { enableConcurrent = true } } } case OnStop => val activeThreads = inbox.synchronized { inbox.numActiveThreads } dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() } } } }
override def onStart() { rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) } }
// DriverEndpoint override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else if (scheduler.nodeBlacklist.contains(hostname)) { executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) context.reply(true) } else { addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorAddress, hostname, cores, cores, logUrls) executorRef.send(RegisteredExecutor) context.reply(true) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() } }
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) } }
// Start worker thread pool private val threadPool = { val threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("Executor task launch worker-%d") .setThreadFactory(new ThreadFactory { override def newThread(r: Runnable): Thread = new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder }) .build() Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] }
总结
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算