spark内存管理源码分析系列之内存管理概述

Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文介绍了Spark的内存框架以及介绍了Spark的哪些模块使用到了内存管理功能,Driver端内存管理比较简单,所以后续的分析都专注于Executor端的内存管理。

内存整体框架

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内[On-heap]空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用,整体架构如下所示:

!

对于一个Executor中,内存的相关结构如下所示:

在这里插入图片描述

  1. org.apache.spark.memory.MemoryManager是JVM级别的内存管理者,负责具体内存的分配和回收;
  2. org.apache.spark.memory.TaskMemoryManager是任务级别的内存管理者,每个任务有一个TaskMemoryManager,它负责Spark任务内存的分配和回收,任务的内存申请通过该管理者跟MemeoryManager进行交互;
  3. org.apache.spark.memory.MemoryConsumer是内存的申请的client,发送请求到TaskMemoryManager,然后经过MemoryManager的内存分配和回收操作;
  4. org.apache.spark.memory.MemoryPool内存池,分为存储内存池和执行内存池,是Spark内存的最基本的抽象,由MemoryManager进行内存的协调。

堆内内存

当我们提交Spark任务时候,会指定–executor-memory 或 spark.executor.memory参数,该部分内存即为堆内内存的大小。Executor内并发运行的任务<由-executor-core或者spark.executor.core参数控制>共享 JVM 堆内内存,Spark任务执行过程中缓存的RDD数据和广播数据占用的内存被规划为存储[Storage]内存,Spark任务在执行Shuffle、Join、Aggregation时占用的内存被规划为执行[Execution]内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。

Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存。

堆外内存

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API<实质上等同于C++中可以自己进行内存管理>,这样子减少了不必要的内存开销<java对象有对象头等开销>,以及JVM的频繁GC扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

哪些情况下会使用内存

Spark中用到内存的地方有哪些?存储内存主要消耗在哪些地方?执行内存主要消耗在哪些地方?

存储内存

首先我们来看看存储内存,Spark任务中数据的缓存或者广播数据以及RDD缓存副本大于1时候会使用到存储内存。

Cache/persist存储内存申请

当RDD的Storage Level包括memory时[也就是调用了RDD.cache或RDD.persist将RDD数据缓存到了memory中],Task在计算得到RDD分区数据时会申请存储内存将数据缓存在内存中。

ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate -> BlockManager.doPutIterator -> MemoryStore.putIteratorAsBytes/MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate -> BlockManager.getLocalValues -> BlockManager.maybeCacheDiskValuesInMemory -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> RDD.computeOrReadCheckpoint -> WriteAheadLogBackedBlockRDD.compute -> WriteAheadLogBackedBlockRDD.compute$getBlockFromWriteAheadLog -> blockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

Broadcast存储内存申请

对广播变量进行存储/缓存也会用到存储内存,写入和读取的代码调用路径如下所示:

TorrentBroadcast.writeBlocks -> BlockManager.putSingle -> BlockManager.putIterator -> BlockManager.doPutIterator -> MemoryStore.putIteratorAsValues/MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

TorrentBroadcast.readBroadcastBlock -> BlockManager.getLocalValues -> BlockManager.maybeCacheDiskValuesInMemory -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

RDD block Replication存储内存申请

当RDD的storage level中的**_replication**大于1时,BlockManager需要将block数据发到另一个远程结点以备份,此时BlockManager会向远程结点发送UploadBlock消息,远程结点在收到该消息后会申请存储内存以存放收到的block数据。

NettyBlockRpcServer.receive -> BlockDataManager.putBlockData -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

NettyBlockRpcServer.receiveStream -> BlockDataManager.putBlockDataAsStream -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

Task运行结果数据相关的存储内存申请

TaskRunner处理task结果数据时,如果task结果数据大于maxDirectResultSize,则会将其存储到本地blockManager,然后将block的meta数据返回给driver,并且这个时候用的storeage level是MEMORY_AND_DISK_SER, 所以会向MemoryManager申请存储内存。

  1. TaskRunner.run -> BlockManager.putBytes -> BlockManager.doPutBytes -> MemoryStore.putIteratorAsValues -> MemoryStore.putIterator

执行内存

执行内存主要用于Spark任务在内存中进行Shuffle、Join、Sort以及Aggregations等计算操作,主要是各种MemoryConsumer向TaskMemoryManager申请内存,具体的后续文章会详细介绍。

参考

  1. https://www.jianshu.com/p/87a36488993a
  2. https://www.jianshu.com/p/f29fc887e89f
  3. https://developer.ibm.com/zh/articles/ba-cn-apache-spark-memory-management/

代码交流 2021