技术架构定位
任务执行与资源管理系统是Spark分布式计算引擎的核心支柱,犹如一座城市的交通与能源系统,它们协同工作,确保计算任务高效流转,资源得到合理分配。在Spark的整体架构中,这一系统连接着上层的DAG调度器与底层的计算资源,将抽象的数据转换计划转化为实际的计算过程。
任务执行与资源管理系统在Spark架构中处于承上启下的关键位置。上游,它接收来自TaskScheduler的任务提交请求,将抽象的计算指令转化为实际的执行过程;下游,它直接与计算资源(如CPU、内存和存储)交互,确保任务能够高效地利用这些资源完成计算。这一系统就像是工厂的生产车间和资源调度中心,将设计蓝图转化为实际产品,同时合理分配各类资源,保证生产过程的顺畅进行。
在分布式计算的复杂环境中,任务执行与资源管理面临诸多挑战:如何在异构的计算节点上保证任务的一致执行?如何处理节点故障和任务失败?如何在多任务并行的情况下合理分配有限资源?如何平衡计算与存储的资源需求?Spark通过精心设计的执行模型和资源管理策略,成功应对了这些挑战,实现了高效、可靠的分布式计算。
本文将深入剖析Spark任务执行与资源管理的内部机制,从执行环境、任务生命周期和资源分配策略等多个角度,揭示这一系统如何支撑Spark的高性能计算能力,为读者提供全面而深入的技术理解。
任务执行环境
任务执行环境是Spark任务运行的基础设施,它为任务提供了必要的运行时支持和上下文信息。理解这一环境,就像了解一个生物的栖息地,能够帮助我们更深入地理解任务如何在分布式系统中生存和工作。
TaskRunner与执行上下文
TaskRunner是任务执行的核心组件,它如同一位工厂车间的技术工人,负责按照"图纸"(Task对象)完成具体的生产工作。每个TaskRunner实例负责执行一个特定的Task,并管理与该任务相关的资源和状态。
TaskRunner的工作流程是典型的生命周期管理:首先准备执行环境,包括设置任务上下文和分配内存资源;然后执行任务主体逻辑;最后进行清理工作,释放资源并报告结果。这一流程设计确保了任务能够在隔离的环境中安全执行,同时保证资源的合理使用和及时回收。
class TaskRunner(
execBackend: ExecutorBackend,
val taskId: Long,
val attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer,
val taskMemoryManager: TaskMemoryManager) extends Runnable {
@volatile private var killed = false
@volatile private var task: Task[Any] = _
@volatile private var startGCTime: Long = _
def run(): Unit = {
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
try {
// 反序列化任务
task = ser.deserialize[Task[Any]](
serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// 创建任务上下文并执行任务
val taskContext = new TaskContextImpl(
taskId,
attemptNumber,
taskAttemptId,
partitionId,
localProperties,
metrics)
TaskContext.setTaskContext(taskContext)
// 执行任务,获取结果
val value = task.run(taskAttemptId, attemptNumber)
// 成功完成任务,返回结果
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
// 处理各种异常情况
} finally {
// 清理资源
TaskContext.unset()
taskMemoryManager.cleanUpAllAllocatedMemory()
}
}
}
TaskContext是任务执行的上下文环境,它提供了任务执行所需的各种信息和服务接口。TaskContext可以类比为工作场所的环境和工具箱,它让工人(TaskRunner)能够了解当前工作的状态,使用合适的工具,并在需要时获取帮助。
TaskContext包含丰富的上下文信息:如stageId、partitionId、attemptNumber等标识信息,用于定位任务在整个执行计划中的位置;localProperties传递用户定义的属性,支持自定义行为;各种监听器机制允许注册任务完成或失败时的回调函数,实现灵活的异步处理。
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext = {
completionListeners += listener
this
}
def addTaskFailureListener(listener: TaskFailureListener): TaskContext = {
failureListeners += listener
this
}
这些监听器机制使得任务能够在执行过程中注册后续处理逻辑,例如资源清理、结果处理或异常恢复等。通过面向事件的设计,Spark实现了任务执行过程中的灵活控制和异步协作,提高了系统的响应性和健壮性。
Executor与任务管理
Executor是Spark执行系统的核心组件,它管理一组计算资源(如CPU核心和内存),并负责在这些资源上执行任务。如果将整个Spark集群比作一个工厂,那么Executor就是工厂中的生产线,它集中了一定的生产资源,并按照调度安排完成具体产品的制造。
Executor的主要职责包括:管理和分配计算资源;接收并执行任务;管理任务的生命周期;维护缓存数据;向Driver报告执行状态和结果。这一设计使得计算能够在数据附近高效进行,同时通过本地缓存提升数据访问速度,体现了Spark “将计算移动到数据"的核心理念。
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false) extends Logging {
// 核心线程池,用于执行任务
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
// 当前运行的任务
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// 任务执行计数器
private val numRunningTasks = new AtomicInteger(0)
// 启动任务
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val taskId = taskDescription.taskId
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
numRunningTasks.incrementAndGet()
}
// 杀死任务
def killTask(taskId: Long, interruptThread: Boolean): Unit = {
val tr = runningTasks.get(taskId)
if (tr != null) {
tr.kill(interruptThread)
}
}
}
Executor采用线程池模型管理任务执行,每个Task由一个专门的线程执行,这使得Executor能够并行处理多个任务,充分利用多核CPU资源。同时,任务间通过TaskMemoryManager进行内存隔离,避免了相互干扰,提高了稳定性和执行效率。
ExecutorBackend是Executor与外部系统(主要是Driver和集群管理器)通信的接口。它负责接收来自Driver的命令(如启动任务、杀死任务)并转发给Executor执行;同时,将Executor上任务的状态和结果报告给Driver。这种通信接口的抽象使得Executor能够适应不同的运行环境,如本地模式、Standalone集群、YARN、Kubernetes等。
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends ExecutorBackend with ThreadSafeRpcEndpoint {
// 处理来自Driver的消息
override def receive: PartialFunction[Any, Unit] = {
case LaunchTask(data) =>
val taskDesc = TaskDescription.decode(data.value)
executor.launchTask(this, taskDesc)
case KillTask(taskId, _, interruptThread) =>
executor.killTask(taskId, interruptThread)
// 处理其他消息类型...
}
// 向Driver报告任务状态
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
}
这种执行环境的设计体现了Spark对分布式计算的深入理解:通过明确的组件职责划分,实现计算逻辑与资源管理的分离;通过灵活的通信机制,支持不同集群环境下的任务协调;通过事件驱动的监听器模式,实现任务执行过程中的异步处理和资源管理。这一架构既保证了执行效率,又提供了足够的灵活性和扩展性,是Spark能够适应各种复杂计算场景的重要基础。
资源分配机制
资源分配是分布式计算系统面临的核心挑战之一。在Spark中,资源分配贯穿于应用启动、任务调度和执行的全过程,就像一个精密的分配系统,确保每项任务都能获得所需的计算资源,同时最大化整体资源利用率。
ExecutorAllocationManager实现
Spark的动态资源分配通过ExecutorAllocationManager组件实现,它就像是一个智能的资源管理员,根据工作负载的变化自动调整执行器数量,实现资源使用的弹性伸缩。这一机制解决了静态资源分配的局限性,避免了资源浪费或不足的问题。
ExecutorAllocationManager的核心工作原理是基于任务积压量和执行器空闲时间来作出扩缩决策。当系统检测到大量待处理任务而执行器资源不足时,会按照策略逐步增加执行器数量;反之,当执行器长时间处于空闲状态时,则会逐步释放这些资源,以便其他应用使用。
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf) extends Logging {
// 动态调整参数
private val minExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
private val maxExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
private val initialExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
// 执行器请求和释放策略
private val schedulerBacklogTimeout = conf.get(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
private val sustainedSchedulerBacklogTimeout =
conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
// 资源扩展因子
private val numExecutorsTarget = new AtomicInteger(initialExecutors)
// 启动定期调整线程
private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"spark-dynamic-executor-allocation")
def start(): Unit = {
executor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
schedule()
} catch {
case e: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", e)
}
}
},
0, executorAllocationRate, TimeUnit.SECONDS)
}
private def schedule(): Unit = {
updateAndSyncNumExecutorsTarget()
}
private def updateAndSyncNumExecutorsTarget(): Unit = {
val maxNeeded = maxNumExecutorsNeeded()
if (maxNeeded > numExecutorsTarget.get()) {
// 需要增加执行器
val numExecutorsToAdd = math.min(
maxExecutors - numExecutorsTarget.get(),
math.max(1, maxNeeded - numExecutorsTarget.get()))
addExecutors(numExecutorsToAdd)
} else if (maxNeeded < numExecutorsTarget.get()) {
// 需要减少执行器
val numExecutorsToRemove = math.min(
numExecutorsTarget.get() - minExecutors,
numExecutorsTarget.get() - maxNeeded)
removeExecutors(numExecutorsToRemove)
}
}
}
执行器扩展策略是一个渐进过程,而非一次性调整到目标值。这种设计考虑了任务波动的不稳定性,避免了过度反应导致的资源抖动。具体而言,系统使用多级超时机制:当检测到任务积压时,先等待短暂时间(schedulerBacklogTimeout)再添加少量执行器;如果积压持续存在,则按照更激进的策略增加更多执行器(sustainedSchedulerBacklogTimeout)。
执行器回收策略同样采用了渐进式设计。系统会跟踪每个执行器的空闲时间,当超过设定阈值(executorIdleTimeout)时将其标记为可移除。为避免频繁的资源波动,回收过程也会考虑多种因素,如执行器运行时间、缓存数据和最小执行器保留数等。
动态资源分配不仅提高了集群资源利用率,还简化了用户配置工作,用户无需精确估算所需资源,系统会根据实际负载自动调整。这一机制特别适合负载变化较大的场景,如交互式查询和多租户环境,大大提升了Spark的适应性和用户友好性。
任务资源分配与调度
在Executor级别分配好计算资源后,Spark还需要解决如何将这些资源分配给具体任务的问题。这一过程由TaskScheduler负责,它就像是工厂的生产调度员,根据任务特性和资源状况,决定谁先执行、在哪里执行,以及分配多少资源。
任务调度遵循一系列原则,如数据本地性优先、公平共享和资源约束等。数据本地性优先原则尝试将任务调度到数据所在位置,减少数据传输开销;公平共享确保多个作业能够按比例分享资源,避免资源独占;资源约束则保证任务获得足够的执行资源,避免资源饥饿或浪费。
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[Int]): Boolean = {
var launchedTask = false
// 按照本地性级别尝试分配任务
for (i <- 0 until availableCpus.length if !launchedTask) {
val execId = s"executor_${i}"
val host = s"host_${i}"
taskSet.resourceOffer(execId, host, maxLocality, availableResources(i)) match {
case Some(task) =>
// 找到匹配任务,分配资源
launchedTask = true
taskIdToTaskSetManager(task.taskId) = taskSet
taskIdToExecutorId(task.taskId) = execId
executorIdToRunningTaskIds(execId).add(task.taskId)
availableCpus(i) -= 1
// 提交任务给执行器
backend.reviveOffers()
case None => // 没有适合的任务
}
}
launchedTask
}
任务级别的资源分配涉及多个维度,如CPU核心、内存和自定义资源(如GPU)等。Spark通过TaskDescription携带资源需求信息,确保每个任务获得所需的计算资源。对于特殊资源如GPU,Spark还提供了细粒度的资源分配机制,支持任务级别的资源申请和释放。
private[spark] case class TaskDescription(
taskId: Long,
attemptNumber: Int,
executorId: String,
name: String,
index: Int,
partitionId: Int,
addedFiles: Map[String, Long],
addedJars: Map[String, Long],
properties: Properties,
resources: Map[String, ResourceInformation]) {
// 资源请求信息
def resourcesAsJavaMap: java.util.Map[String, ResourceInformation] = {
resources.asJava
}
}
任务执行期间的资源管理同样重要。TaskRunner负责管理任务内存分配,确保任务不会超出限制;同时,监控任务执行情况,在必要时终止异常任务,释放资源。这种精细的资源管理机制保证了任务执行的稳定性和效率,是Spark能够处理大规模数据的重要保障。
资源分配与调度是一个动态平衡的过程,系统需要在公平性、效率和资源利用率之间找到最佳折衷。Spark通过灵活的调度策略、精细的资源隔离和动态的资源调整,成功地实现了这一平衡,为用户提供了既高效又易用的分布式计算环境。
任务状态管理
任务状态管理是Spark执行系统的核心环节,它跟踪任务的生命周期,处理成功与失败情况,确保整个计算过程的可靠性和一致性。这一机制就像是工厂中的质量控制系统,监控每个产品的生产状态,及时处理异常情况,保证最终产品的质量。
事件通知系统
Spark的任务状态管理建立在事件驱动架构之上,通过一系列事件和监听器,实现了任务状态的追踪和响应。这种设计类似于现代GUI系统中的事件处理机制,使得系统各组件能够灵活地响应状态变化,而无需紧密耦合。
LiveListenerBus是Spark事件系统的核心,它维护了一系列监听器,并将事件分发给对应的监听器处理。任务相关事件如TaskSubmitted、TaskStarted、TaskCompleted等,会在任务生命周期的不同阶段触发,并通过LiveListenerBus传递给各个监听器。
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
// 事件队列和处理线程
private val queues = new CopyOnWriteArrayList[(String, AsyncEventQueue)]
def post(event: SparkListenerEvent): Unit = {
if (shouldPostEventToQueues(event)) {
val it = queues.iterator()
while (it.hasNext()) {
it.next()._2.post(event)
}
}
}
// 添加监听器
def addListener(listener: SparkListener, queue: String = "DEFAULT"): Unit = {
val targetQueue = queues.asScala.find(_._1 == queue)
targetQueue.foreach(_._2.addListener(listener))
}
}
监听器机制使得任务状态管理与业务逻辑分离,系统可以根据不同需求注册各类监听器,如进度跟踪、故障检测、性能监控等。这种松耦合设计提高了系统的可扩展性和可测试性,同时简化了核心执行逻辑。
TaskContext中的完成和失败监听器是另一种事件机制,它们直接绑定到特定任务,在任务完成或失败时自动触发。这种机制特别适合执行任务特定的清理和后处理工作,如资源释放、结果处理或异常恢复等。
// 注册任务完成监听器
taskContext.addTaskCompletionListener { context =>
// 任务完成后执行的清理工作
blockManager.releaseAllLocksForTask(context.taskAttemptId())
}
// 注册任务失败监听器
taskContext.addTaskFailureListener { (context, error) =>
// 任务失败后执行的恢复工作
logError(s"Task ${context.taskAttemptId()} failed: $error")
metrics.incFailedTasks()
}
通过这些事件通知机制,Spark实现了任务状态的实时追踪和灵活响应,为高效的任务管理奠定了基础。
任务重试与故障恢复
分布式环境中的故障是常态而非异常,一个健壮的系统必须能够优雅地处理各类故障并保证计算的正确性。Spark的任务重试和故障恢复机制正是为应对这一挑战而设计的,它就像汽车的安全气囊和保险杠,在发生碰撞时保护乘客安全。
TaskSetManager是任务重试的核心组件,它跟踪每个任务的执行状态,并在检测到任务失败时启动重试逻辑。根据失败原因和重试次数,系统会采取不同的恢复策略,如重试特定任务、重试整个Stage或标记作业失败等。
private[scheduler] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None) extends Logging {
// 任务失败计数器
private val taskAttempts = new Array[Int](taskSet.tasks.length)
private val successful = new Array[Boolean](taskSet.tasks.length)
// 处理任务失败
def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason): Unit = {
val info = taskInfos(tid)
val index = info.index
// 更新任务状态
info.markFinished(state, clock.getTimeMillis())
// 根据失败原因处理
reason match {
case e: ExceptionFailure =>
// 任务执行异常,可能需要重试
taskAttempts(index) += 1
if (taskAttempts(index) >= maxTaskFailures) {
abort("Task %d failed %d times, exceeding max attempts: %s"
.format(index, maxTaskFailures, e.toErrorString))
} else {
// 重新提交任务
addPendingTask(index)
}
case e: TaskCommitDenied =>
// 提交被拒绝,可能是因为Stage已经完成
logInfo(s"Task ${info.id} denied commit: ${e.toErrorString}")
case e: ExecutorLostFailure =>
// 执行器丢失,需要重新调度任务
logInfo(s"Task ${info.id} failed because executor ${e.executorId} is lost")
if (successful(index) || shouldRescheduleAfterExecutorLost(index)) {
addPendingTask(index)
}
// 处理其他失败原因...
}
}
}
Spark的故障恢复策略根据故障类型和影响范围采取不同措施,既保证了系统健壮性,又避免了不必要的资源浪费:
任务级故障(如算法错误、内存溢出)通常通过重试特定任务解决,系统会记录失败次数,当达到阈值时终止重试,避免无限循环; 执行器级故障(如节点崩溃、网络断开)需要重新调度该执行器上的所有任务,同时考虑数据本地性和黑名单机制,避免将任务调度到问题节点; Stage级故障(如Shuffle数据丢失)可能需要重新计算整个Stage,这是RDD血缘关系发挥作用的地方,系统能够基于不变的父RDD重新生成丢失的数据; 应用级故障(如Driver崩溃)则需要依靠外部检查点或持久化RDD进行恢复,这通常需要用户显式配置。
为了提高故障恢复的效率,Spark引入了多项优化:黑名单机制避免将任务调度到频繁失败的节点;推测执行(Speculative Execution)通过复制执行异常慢的任务,减少长尾影响;本地性放宽策略在重试时逐步降低本地性要求,加快任务启动;数据本地性重建通过缓存和存储层优化,减少重新计算的代价。
通过这些精心设计的故障处理机制,Spark能够在充满不确定性的分布式环境中提供稳定可靠的计算服务,即使面对各种故障和异常,也能保持优雅运行并最终完成计算任务。
内存管理实现
内存是现代计算系统中最宝贵的资源之一,对于处理大规模数据的Spark而言,高效的内存管理更是性能与稳定性的关键。Spark的内存管理系统就像一位精明的资源管家,通过一系列精细策略,确保有限的内存资源得到最有效的利用。
MemoryManager体系与动态调整
Spark的内存管理以MemoryManager为核心,它负责整个Executor内存资源的分配和回收。MemoryManager的设计经历了从静态分配到统一管理的演进,反映了对内存资源更精细化管理的追求。
在统一内存管理模型中,Executor的内存分为两大类:执行内存(Execution Memory)和存储内存(Storage Memory)。执行内存用于Shuffle、Join和聚合等计算操作;存储内存则用于缓存RDD、广播变量和临时数据。这两部分内存并非严格隔离,而是可以相互借用,实现资源的动态平衡。
private[spark] class UnifiedMemoryManager(
conf: SparkConf,
val maxHeapMemory: Long,
val maxOffHeapMemory: Long,
val numCores: Int)
extends MemoryManager(
conf,
numCores,
maxHeapMemory,
maxOffHeapMemory) {
// 初始内存分配比例
private val storageFraction = conf.get(MEMORY_STORAGE_FRACTION)
private val minMemory = math.min(maxHeapMemory, maxOffHeapMemory)
// 创建内存池
private val storagePool = new StorageMemoryPool(this)
private val executionPool = new ExecutionMemoryPool(this)
// 设置初始大小
storagePool.setPoolSize(maxHeapMemory * storageFraction)
executionPool.setPoolSize(maxHeapMemory * (1 - storageFraction))
// 动态借用内存
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = {
// 尝试从执行内存池获取
var acquired = executionPool.acquireMemory(numBytes, taskAttemptId)
// 如果不足,尝试从存储内存池借用
if (acquired < numBytes) {
val storageMemoryToFree = Math.min(numBytes - acquired, storagePool.memoryFree)
if (storageMemoryToFree > 0) {
storagePool.shrinkPoolToFreeSpace(storageMemoryToFree)
executionPool.incrementPoolSize(storageMemoryToFree)
acquired += executionPool.acquireMemory(numBytes - acquired, taskAttemptId)
}
}
acquired
}
}
内存动态调整是统一内存管理的核心特性。当某类内存不足时,系统可以从另一类内存借用空间,前提是被借用的内存有空闲。这种机制特别有利于适应不同类型的工作负载:计算密集型应用能够获得更多执行内存,而缓存密集型应用则能够利用更多存储内存,无需用户手动调整配置。
借用机制并非无限制的,系统会保留一定比例的初始分配(通过spark.memory.storageFraction配置),以确保核心功能不受影响。例如,即使存储内存紧张,也不会驱逐所有缓存数据;同样,执行内存也会保留必要的工作空间,避免过度借用导致的性能抖动。
任务间的内存隔离是另一重要特性。每个任务有自己的TaskMemoryManager,通过该组件申请和释放内存,避免了任务间的相互干扰。任务内存的分配遵循公平共享原则,系统会尝试平均分配资源,并在必要时回收闲置内存,保证活跃任务的内存需求。
private[memory] class ExecutionMemoryPool(lock: Object) extends MemoryPool(lock) {
// 任务ID到内存使用量的映射
private val memoryForTask = new HashMap[Long, Long]
// 申请内存实现
def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = {
lock.synchronized {
// 计算任务可用的最大内存
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
val maxPoolMem = poolSize / numActiveTasks
val maxMemForTask = maxPoolMem - curMem
// 分配内存,不超过可用上限
val toGrant = math.min(numBytes, maxMemForTask)
if (toGrant > 0) {
memoryForTask(taskAttemptId) = curMem + toGrant
_memoryUsed += toGrant
}
toGrant
}
}
}
MemoryManager的设计体现了Spark对分布式内存管理的深入理解。通过统一内存模型、动态调整和任务隔离等机制,系统实现了内存资源的高效利用和灵活管理,为处理大规模数据提供了坚实的基础。
存储系统交互
内存管理与存储系统的交互是Spark中一个重要的协作关系,它决定了数据如何在内存、磁盘和网络间流动,直接影响系统的性能和稳定性。这种关系就像是工厂中的生产与仓储部门协作,需要紧密配合才能实现高效运作。
BlockManager是Spark存储系统的核心组件,它管理内存和磁盘上的数据块,为计算和缓存提供统一的存储接口。当RDD分区、广播变量或Shuffle数据需要存储时,BlockManager会根据存储级别(StorageLevel)和可用资源,决定将数据放在内存、磁盘,或者两者兼有。
private[spark] class BlockManager(
executorId: String,
memoryManager: MemoryManager,
blockTransferService: BlockTransferService,
conf: SparkConf) extends BlockDataManager with Logging {
// 内存和磁盘存储
private val memoryStore = new MemoryStore(this, memoryManager)
private val diskStore = new DiskStore(this, conf)
// 获取数据块
def getLocalValues(blockId: BlockId): Option[BlockResult] = {
val m = master.getLocations(blockId).filter(_.executorId == executorId)
if (m.isDefined) {
val status = m.get.head
val memResult = if (status.memSize > 0) memoryStore.getValues(blockId) else None
val diskResult = if (status.diskSize > 0) diskStore.getBytes(blockId) else None
if (memResult.isDefined) {
Some(BlockResult(memResult.get, DataReadMethod.Memory, status.memSize))
} else if (diskResult.isDefined) {
// 从磁盘读取,可能需要反序列化
Some(BlockResult(diskResult.get, DataReadMethod.Disk, status.diskSize))
} else {
None
}
} else {
None
}
}
// 存储数据块
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
}
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel): Boolean = {
// 尝试存储到内存
if (level.useMemory) {
val putSucceeded = memoryStore.putBytes(blockId, bytes, level.memoryMode)
if (putSucceeded) return true
}
// 内存不足,考虑存储到磁盘
if (level.useDisk) {
diskStore.putBytes(blockId, bytes)
return true
}
false
}
}
MemoryStore是BlockManager的内存存储组件,它直接与MemoryManager交互,申请和释放内存资源。当内存不足时,MemoryStore会根据存储级别和淘汰策略,驱逐部分数据块,为新数据腾出空间。这一过程涉及复杂的决策逻辑,如考虑数据块大小、访问频率和缓存策略等因素。
private[spark] class MemoryStore(
blockManager: BlockManager,
memoryManager: MemoryManager) extends Logging {
// 内存中的数据块
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// 将数据存入内存
def putBytes(blockId: BlockId, size: Long, memoryMode: MemoryMode): Boolean = {
// 尝试从内存管理器申请空间
val acquired = memoryManager.acquireStorageMemory(blockId, size, memoryMode)
if (acquired < size) {
// 内存不足,需要腾出空间
val spaceToFree = size - acquired
if (memoryMode == MemoryMode.ON_HEAP) {
// 尝试驱逐数据块
val freedOrEvict = freeSpaceToShrinkPool(spaceToFree, memoryMode)
if (freedOrEvict) {
// 再次尝试申请内存
val acquiredAfterEviction = memoryManager.acquireStorageMemory(blockId, size, memoryMode)
if (acquiredAfterEviction < size) {
// 仍然不足,放弃存储
return false
}
} else {
// 无法腾出足够空间
return false
}
} else {
// 堆外内存不足
return false
}
}
// 内存充足,存储数据
entries.put(blockId, new SerializedMemoryEntry(bytes, size, memoryMode))
true
}
// 驱逐数据块腾出空间
private def evictBlocksToFreeSpace(space: Long, memoryMode: MemoryMode): Long = {
// 按LRU策略选择待驱逐的数据块
val iterator = entries.entrySet().iterator()
var freedMemory = 0L
while (freedMemory < space && iterator.hasNext) {
val entry = iterator.next()
val blockId = entry.getKey
val memoryEntry = entry.getValue
// 检查是否可以驱逐
if (memoryEntry.memoryMode == memoryMode && blockManager.canEvict(blockId)) {
// 从内存移除
iterator.remove()
freedMemory += memoryEntry.size
// 如果需要,写入磁盘
if (blockManager.shouldStore(blockId)) {
blockManager.getDiskStore.put(blockId, memoryEntry.getBytes)
}
// 更新元数据
blockManager.notifyEviction(blockId, memoryEntry.size, diskStore.contains(blockId))
}
}
freedMemory
}
}
内存与磁盘的协作是Spark存储系统的一大特色。当内存不足时,系统会根据存储级别,将部分数据溢写到磁盘;当需要访问这些数据时,会自动从磁盘加载。这种透明的数据流动机制使得Spark能够处理超出内存容量的大规模数据集,同时保持较高的性能。
溢写策略是内存与磁盘协作的关键。Spark采用多种策略控制数据溢写过程,如设置溢写阈值、使用代价估算模型选择溢写数据块、实现批量溢写减少IO开销等。这些策略平衡了内存效率和磁盘IO开销,使系统在各种工作负载下都能保持良好性能。
通过这种紧密的交互关系,内存管理系统和存储系统共同构成了Spark数据处理的底层基础,支撑起上层的各种计算和转换操作。理解这一关系,有助于更好地配置和优化Spark应用,提高资源利用效率和计算性能。
Tungsten优化与内存对齐
Tungsten是Spark的内存和CPU效率优化项目,它通过一系列底层优化,显著提升了系统性能。在内存管理方面,Tungsten引入了直接内存操作、二进制格式和缓存友好的数据布局等创新,犹如将普通轿车升级为赛车,通过引擎和传动系统的优化,实现了性能的质的飞跃。
直接内存管理是Tungsten的核心特性之一。与传统的Java对象模型不同,Tungsten直接在堆外内存中分配和管理数据,绕过了Java垃圾收集器,减少了内存开销和GC暂停时间。这种设计特别适合大规模数据处理,因为大量小对象的创建和回收是传统JVM性能瓶颈的主要来源。
public class UnsafeRow extends MutableRow {
private final int numFields;
private final long baseOffset;
private final byte[] baseObject;
// 根据偏移量获取数据
public boolean getBoolean(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getBoolean(baseObject, baseOffset + ordinal);
}
public byte getByte(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getByte(baseObject, baseOffset + ordinal);
}
// 其他类型的获取方法...
// 设置数据值
public void setBoolean(int ordinal, boolean value) {
assertIndexIsValid(ordinal);
Platform.putBoolean(baseObject, baseOffset + ordinal, value);
}
// 其他类型的设置方法...
}
内存对齐是Tungsten优化的另一重要方面。Tungsten实现了数据结构的字节级对齐,确保数据访问与CPU缓存行边界一致,减少了缓存未命中和内存访问延迟。这种优化在数据密集型操作(如排序和Join)中尤其有效,可以显著提升CPU效率。
二进制数据格式是Tungsten的另一创新。传统上,Spark使用Java对象表示数据,这些对象包含大量元数据和引用,增加了内存占用。Tungsten引入了紧凑的二进制格式(如UnsafeRow),直接在序列化数据上操作,避免了反序列化开销,同时减少了内存占用。
// 二进制数据格式示例
// 8位的数据结构:
// [null标志位(1字节)][值长度前缀(4字节)][实际数据]
private static void encodeString(
byte[] target, int offset, String value) {
// 写入null标志位
target[offset] = value == null ? 1 : 0;
if (value != null) {
// 将字符串转换为UTF-8字节
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
// 写入长度
Platform.putInt(target, offset + 1, bytes.length);
// 写入实际数据
System.arraycopy(bytes, 0, target, offset + 5, bytes.length);
}
}
代码生成是Tungsten的高级优化特性。通过在运行时生成专用的字节码,Tungsten能够针对特定数据结构和操作模式优化执行代码,减少了通用代码的开销,提高了指令缓存效率和分支预测成功率。这种"即时编译"方法在SQL查询和复杂表达式计算中特别有效。
这些优化技术共同构成了Tungsten的内存优化体系,使Spark在处理大规模数据时能够达到接近原生系统的性能水平。对于任务执行和资源管理而言,Tungsten优化意味着更高的计算效率和更低的资源消耗,进一步提升了系统的整体性能和可扩展性。
值得注意的是,Tungsten优化并非对所有场景都同等有效。对于小数据集或IO密集型操作,其收益可能有限;而对于内存密集型计算,如复杂聚合和Join操作,Tungsten能够带来显著的性能提升。了解这些特性,有助于开发者更好地设计和优化Spark应用,充分发挥系统潜力。
事件通知系统
事件通知系统是Spark任务执行框架中的重要组成部分,它实现了任务执行过程中的异步处理、状态更新和资源管理。通过观察者模式的设计,事件系统使各组件能够灵活地响应状态变化,而无需强耦合,提高了系统的可扩展性和健壮性。
观察者模式应用
观察者模式是一种行为设计模式,它定义了对象间的一对多依赖关系,当一个对象状态变化时,所有依赖它的对象都会得到通知。Spark的事件通知系统是观察者模式的典型应用,通过这一模式,实现了系统各组件间的松耦合协作。
LiveListenerBus是Spark事件系统的核心,它充当了观察者模式中的"主题”(Subject)角色,维护了一系列监听器(Observer),并在事件发生时通知这些监听器。事件提交到LiveListenerBus后,会异步分发给对应的监听器处理,实现了事件产生和处理的解耦。
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
// 事件队列,按类型分组
private[spark] val queues = new CopyOnWriteArrayList[(String, AsyncEventQueue)]
// 提交事件
def post(event: SparkListenerEvent): Unit = {
// 将事件分发到各个队列
val it = queues.iterator()
while (it.hasNext()) {
it.next()._2.post(event)
}
}
// 添加事件监听器
def addToQueue(listener: SparkListener, queue: String): Unit = {
// 找到目标队列,添加监听器
val targetQueue = queues.asScala.find(_._1 == queue)
targetQueue.foreach(_._2.addListener(listener))
}
}
SparkListener接口定义了观察者的角色,所有关注Spark事件的组件都实现这一接口。接口中包含多个事件处理方法,如onTaskStart、onTaskEnd、onStageCompleted、onJobStart等,分别对应不同类型的事件。实现类可以选择性地重写这些方法,只处理自己关注的事件类型。
trait SparkListener {
// 任务相关事件
def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
// Stage相关事件
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
// Job相关事件
def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
// 其他事件...
}
事件类继承体系是观察者模式中的"通知信息"部分。所有事件类型都继承自SparkListenerEvent接口,包含任务事件(TaskStarted、TaskEnd等)、Stage事件(StageSubmitted、StageCompleted等)和Job事件(JobStart、JobEnd等)等多种类型。这些事件包含了丰富的上下文信息,使监听器能够了解事件的详细情况,并做出相应处理。
// 事件类示例
case class SparkListenerTaskStart(
stageId: Int,
stageAttemptId: Int,
taskInfo: TaskInfo) extends SparkListenerEvent
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
metrics: TaskMetrics) extends SparkListenerEvent
观察者模式的应用带来了多项优势:首先,它实现了事件产生者和消费者的解耦,允许系统轻松添加新的监听器而不影响现有组件;其次,事件处理的异步特性避免了对关键执行路径的阻塞,提高了系统响应性;最后,基于事件的设计使得系统行为更加透明和可追踪,便于监控和调试。
任务生命周期事件
任务生命周期事件是Spark事件系统中最活跃的部分,它跟踪任务从创建到完成的整个过程,为监控、调试和优化提供了丰富的信息。这些事件就像是任务执行过程中的"路标",标记了关键阶段并记录相关数据,帮助我们理解和优化任务执行。
任务生命周期的主要事件包括:
TaskSubmitted事件标志着任务被提交到调度系统,等待分配资源和执行。这一事件包含任务的基本信息,如所属Stage、分区ID、任务属性等。系统会在DAGScheduler提交任务到TaskScheduler时触发此事件。
TaskStarted事件表示任务开始执行,已分配到具体的执行器并启动运行。事件中包含执行器ID、主机地址和启动时间等信息,可用于分析任务调度延迟和执行位置分布。
TaskGettingResult事件表明任务已完成计算,正在获取结果。对于远程任务,这可能涉及将结果数据传输回Driver,这一阶段的延迟对于大结果集尤为重要。
TaskEnd事件标志着任务执行的结束,无论成功还是失败。事件携带了丰富的信息,包括执行时间、结果大小、成功或失败原因,以及详细的度量数据(如CPU时间、内存使用、IO字节数等)。这些信息对于性能分析和故障诊断至关重要。
// 在TaskRunner中触发任务事件
def run(): Unit = {
// 任务开始
val taskStartTime = System.currentTimeMillis()
var taskStarted = false
try {
// 反序列化并准备任务
// ...
// 发布任务开始事件
eventProcessLoop.post(TaskStarted(task.stageId, task.stageAttemptId, taskInfo))
taskStarted = true
// 执行任务
val value = task.run(taskAttemptId)
// 获取结果
eventProcessLoop.post(TaskGettingResult(taskInfo))
// 序列化结果
val resultSer = env.serializer.newInstance()
val serializedResult = resultSer.serialize(value)
// 发布任务完成事件
eventProcessLoop.post(
TaskEnd(task.stageId, task.stageAttemptId, "SUCCESS",
null, taskInfo, taskMetrics))
} catch {
case e: Exception =>
// 发布任务失败事件
if (taskStarted) {
eventProcessLoop.post(
TaskEnd(task.stageId, task.stageAttemptId, "FAILED",
TaskFailedReason.fromError(e), taskInfo, taskMetrics))
}
throw e
}
}
这些事件不仅是记录,更是系统各组件协作的基础。通过监听和处理这些事件,Spark实现了多种功能:
进度追踪与可视化。SparkUI监听任务事件,实时更新作业进度和状态,提供直观的可视化界面,帮助用户了解应用执行情况。
性能监控与分析。度量收集器聚合任务事件中的性能数据,计算统计信息,识别性能瓶颈和异常模式,为性能优化提供依据。
资源管理与调度。ExecutorAllocationManager基于任务积压和完成情况,动态调整执行器数量;TaskScheduler利用任务完成信息优化后续调度决策。
故障检测与恢复。通过监听任务失败事件,系统能够快速识别问题节点和故障模式,实施重试或故障转移,确保计算的可靠性。
通过这些丰富的事件和灵活的监听机制,Spark构建了一个透明、可扩展的任务执行环境,使系统各组件能够协同工作,共同支持高效可靠的分布式计算。
技术关联
任务执行与资源管理作为Spark核心引擎的关键环节,与其他组件和技术有着广泛的关联。理解这些关联关系,有助于从整体架构的角度把握Spark的设计理念和运行机制,为应用开发和系统优化提供全局视野。
与RDD和Stage划分的关系
任务执行系统与RDD内部实现和Stage划分机制有着紧密的联系,它们共同构成了Spark执行引擎的核心流程。这种关系就像是一条生产线上的连续环节,前一环节的输出直接影响后一环节的处理过程和效率。
RDD内部结构直接决定了任务执行的计算逻辑。每个RDD的compute方法定义了如何处理分区数据,这一逻辑会被封装到任务中,由执行器执行。例如,MapPartitionsRDD的compute方法在分区数据上应用用户定义的函数,这一逻辑会成为ShuffleMapTask或ResultTask的核心计算部分。
override def compute(split: Partition, context: TaskContext): Iterator[U] = {
// MapPartitionsRDD的计算逻辑
f(context, split.index, firstParent[T].iterator(split, context))
}
// 上述计算逻辑会被封装到任务中
def runTask(context: TaskContext): MapStatus = {
// 调用RDD的compute方法处理分区
val rddIterator = rdd.iterator(partition, context)
// 进行后续处理...
}
RDD的依赖关系影响任务执行的数据流。窄依赖允许任务内部流水线执行,多个转换操作在一个任务中串联处理;而宽依赖则需要通过Shuffle交换数据,导致任务间的依赖和数据传输。这种依赖结构直接影响了任务执行的效率和资源需求。
Stage划分是连接RDD和任务执行的桥梁。DAGScheduler将RDD DAG划分为多个Stage,并为每个Stage创建对应的任务集(TaskSet)。ShuffleMapStage和ResultStage分别对应ShuffleMapTask和ResultTask,两者处理逻辑不同:前者需要将结果写入Shuffle系统,后者则将结果返回Driver或写入存储系统。
分区划分直接影响任务数量和并行度。RDD的partitions方法返回分区数组,每个分区对应一个任务。分区数量决定了任务数量,进而影响资源利用和负载均衡。合理的分区策略对于优化任务执行至关重要,过少的分区会限制并行度,过多的分区则增加调度开销。
数据本地性信息指导任务调度。RDD的preferredLocations方法提供了分区数据的位置偏好,TaskScheduler利用这些信息将任务调度到数据所在位置,减少数据传输开销。这种数据本地性优化是Spark性能优势的重要来源。
这种紧密的关联关系要求开发者在设计RDD转换链和配置参数时,同时考虑任务执行和资源管理的影响。例如,合理设置分区数量、选择适当的存储级别、优化Shuffle操作,都能显著影响任务执行效率和资源利用率。
与并发模型和调度系统的协作
任务执行与资源管理系统与Spark的并发模型和调度系统密切协作,共同实现高效的分布式计算。这种协作关系就像是交响乐团中不同乐器组的配合,虽然各自演奏不同的旋律,但共同构成了和谐的整体。
并发模型是任务执行系统的基础。Spark采用基于任务的并发模型,每个任务在单独的线程中执行,相互独立且并行。这种模型结合了数据并行和任务并行的优势,既利用了单机多核的计算能力,又支持跨节点的分布式执行。任务执行系统通过线程池管理任务执行,确保资源的高效利用和负载均衡。
// Executor中的线程池管理
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
def launchTask(context: ExecutorBackend, taskDesc: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDesc)
runningTasks.put(taskDesc.taskId, tr)
threadPool.execute(tr) // 任务在独立线程中执行
}
任务窃取(Task Stealing)是一种高级并发优化技术,它允许空闲执行器从繁忙执行器"窃取"任务,提高资源利用率。这种技术在负载不均衡或任务执行时间差异大的场景特别有效。Spark的任务窃取主要通过调度层实现,将未开始执行的任务重新分配给空闲资源。
调度系统与任务执行的协作体现在多个方面。TaskScheduler负责任务的调度和资源分配,它考虑多种因素,如数据本地性、公平共享和资源限制,为任务找到最合适的执行位置。任务执行系统则负责任务的实际运行和状态管理,并通过事件系统反馈执行情况,供调度系统作出下一步决策。
// TaskSchedulerImpl中的任务调度逻辑
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[Int]): Boolean = {
var launchedTask = false
// 按本地性级别尝试调度任务
for (i <- 0 until availableCpus.length if !launchedTask) {
val execId = s"executor_${i}"
val host = s"host_${i}"
taskSet.resourceOffer(execId, host, maxLocality, availableResources(i)) match {
case Some(task) =>
// 找到匹配任务,分配资源
launchedTask = true
// ...省略详细逻辑
case None => // 无匹配任务
}
}
launchedTask
}
公平调度与容量调度是Spark支持的两种主要调度策略。公平调度确保所有活跃作业按权重比例分享资源,避免资源独占;容量调度则根据预先分配的资源池,为不同作业提供隔离的资源空间。这些策略通过与任务执行系统的协作,实现了多用户环境下的资源共享和隔离。
延迟调度是一种优化数据本地性的技术。当找不到满足最佳本地性的任务时,调度器不会立即降级到次优级别,而是等待一段时间,以期有更好的资源出现。这种策略平衡了数据本地性和执行延迟,通常能够提高整体性能。
并发控制与资源隔离是协作的重要方面。为避免资源争用和干扰,系统实现了多级隔离机制:执行器级隔离确保不同应用的资源边界;任务级隔离通过内存管理和调度策略,避免任务间相互影响;操作级隔离则通过内存分配和释放策略,控制具体操作的资源使用。
对SQL和Streaming的支持
任务执行与资源管理系统不仅支持基本的RDD操作,还为高级API如SparkSQL和Structured Streaming提供了强大支持。这种支持使得Spark能够统一处理批处理和流处理工作负载,实现"一套引擎,多种模式"的设计目标。
SparkSQL的查询执行直接构建在任务执行系统之上。SQL查询经过解析、优化和物理计划生成后,最终转换为RDD操作和任务执行。这一转换过程利用了Spark的基础执行引擎,同时引入了多项优化,如全阶段代码生成、谓词下推和列式存储等,提升查询性能。任务执行系统通过灵活的资源分配和高效的内存管理,为这些优化提供了基础支持。
// SparkSQL执行查询的简化过程
def sql(sqlText: String): DataFrame = {
// 解析SQL为逻辑计划
val logicalPlan = parser.parsePlan(sqlText)
// 优化逻辑计划
val optimizedPlan = optimizer.execute(logicalPlan)
// 生成物理计划
val physicalPlan = planner.plan(optimizedPlan).next()
// 转换为RDD操作并执行
val rdd = physicalPlan.execute()
// 返回结果DataFrame
Dataset.ofRows(sparkSession, logicalPlan)
}
Structured Streaming基于微批处理模型,将连续的数据流分解为一系列小批量处理。每个微批被视为一个单独的批处理作业,经过DAG调度器生成任务并执行。任务执行系统的低延迟和高吞吐特性,使其成为流处理的理想基础。同时,系统的容错机制和状态管理能力,也为长时间运行的流处理作业提供了可靠保障。
内存管理对SQL和流处理有特殊优化。对于SQL查询,系统提供了列式内存格式和堆外存储,减少了内存占用和GC开销;对于流处理,系统实现了高效的状态存储和管理,支持有状态的持续计算。这些优化使得Spark在处理复杂查询和长时间运行的流处理作业时,能够保持高性能和稳定性。
资源管理策略针对不同工作负载特点进行了调整。SQL查询通常对响应时间敏感,系统提供了公平调度和资源池隔离,确保交互式查询不被长时间运行的批处理作业阻塞;流处理则需要稳定的资源供应,系统通过专用资源池和优先级调度,保证流处理任务的连续执行。
动态资源分配对不同应用场景有针对性优化。对于批处理,系统根据任务积压情况动态增减执行器,优化资源利用;对于流处理,系统倾向于维持稳定的执行器数量,避免频繁的资源波动导致性能抖动。这种灵活的资源策略使得Spark能够适应各种复杂的混合工作负载。
任务执行与资源管理系统通过这些特性和优化,为SQL和流处理提供了强大支持,使得Spark成为一个真正的统一计算引擎,能够高效处理各种数据处理需求,从批量ETL到实时分析,从简单聚合到复杂机器学习,都能得到优异的性能和可靠的执行保证。
参考资料
[1] Matei Zaharia et al. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. NSDI 2012.
[2] Kay Ousterhout et al. Making Sense of Performance in Data Analytics Frameworks. NSDI 2015.
[3] Michael Armbrust et al. Spark SQL: Relational Data Processing in Spark. SIGMOD 2015.
[4] Apache Spark 官方文档. https://spark.apache.org/docs/latest/job-scheduling.html
[5] Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia. Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly Media, 2015.
被引用于
[1] Spark-容错机制实现
[2] Spark-内存优化技术
[3] Spark-故障诊断与排查