Spark内存管理原理(上)

          Spark运行是内存分为三部分,执行内存(execute memory),存储内存(storge memory),预留内存(reserved memory).在1.6版本以前执行内存和存储内存是静态分配的,意思着应用一启动后,各区域的内存大小就是不变的。这就会带来一个后果,有时执行内存严重不足,但是存储内存又都没怎么用,或者相反。因为这个问题,spakr1.6版本之后引入的动态内存管理机制。本文就是主要讲讲这一部分的内存。

一、新旧内存管理

1、旧有(1.6版本之前)的内存管理
概念上,内存空间被分成了三块独立的区域,每块区域的内存容量是按照JVM堆大小的固定比例进行分配的:
Execution:在执行shuffle、join、sort和aggregation时,用于缓存中间数据。通过spark.shuffle.memoryFraction进行配置,默认为0.2。
Storage:主要用于缓存数据块以提高性能,同时也用于连续不断地广播或发送大的任务结果。通过

1`
spark.storage.memoryFraction进行配置,默认为0.6。
Other:这部分内存用于存储运行Spark系统本身需要加载的代码与元数据,默认为0.2。
无论是哪个区域的内存,只要内存的使用量达到了上限,则内存中存储的数据就会被放入到硬盘中,从而清理出足够的内存空间。这样一来,由于与执行或存储相关的数据在内存中不存在,就会影响到整个系统的性能,导致I/O增长,或者重复计算。

2、1.6版本之后

到了1.6版本,Execution Memory和Storage Memory之间支持跨界使用。当执行内存不够时,可以借用存储内存,反之亦然。
1.6版本的实现方案支持借来的存储内存随时都可以释放,但借来的执行内存却不能如此。
新的版本引入了新的配置项:
spark.memory.fraction(默认值为0.75):用于设置存储内存和执行内存占用堆内存的比例。若值越低,则发生spill和evict的频率就越高。注意,设置比例时要考虑Spark自身需要的内存量。
spark.memory.storageFraction(默认值为0.5):显然,这是存储内存所占spark.memory.fraction设置比例内存的大小。当整体的存储容量超过该比例对应的容量时,缓存的数据会被evict。
spark.memory.useLegacyMode(默认值为false):若设置为true,则使用1.6版本前的内存管理机制。此时,如下五项配置均生效:
spark.storage.memoryFraction
spark.storage.safetyFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction
spark.shuffle.safetyFraction

二、源码

1、在SparkConf实例化的过程中会创建一个SparkEnv对像,其中有这么一段:

1 val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) 2 val memoryManager: MemoryManager = 3 if (useLegacyMemoryManager) { //使用静态内存管理 4 new StaticMemoryManager(conf, numUsableCores) 5 } else { //使用动态内存管理 6 UnifiedMemoryManager(conf, numUsableCores) 7 } 8
1我们可以通过显示的在应用的启动脚本里设置 2

--conf spark.memory.useLegacyMode = true,来使用1.6版本之前的静态内存管理方式。默认是使用动态内存

2、StaticMemoryManager

接下来看看StaticMemoryManager实例化的过程,它在包/org/apache/spark/memory下。

首先看看返回的最大的可用存储内存

1 //返回可用最大的存储内存 2 private def getMaxStorageMemory(conf: SparkConf): Long = { 3 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)//spark.testing.memory没有设置的话,取系统可用最大内存 4 val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) //默认取0.6 5 val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) //安全系统0.9 6 (systemMaxMemory * memoryFraction * safetyFraction).toLong //最大存储内存=0.6*0.9*系统可用最大内存 7 } 8
1默认最大也就0.54\*系统最大内存 2

再来看看返回可用的最大执行内存

1 //返回系统最大的执行内存 2 private def getMaxExecutionMemory(conf: SparkConf): Long = { 3 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) 4 5 if (systemMaxMemory < MIN_MEMORY_BYTES) { //最大可用内存小于最小内存抛出异常 6 throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + 7 s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " + 8 s"option or spark.driver.memory in Spark configuration.") 9 } 10 if (conf.contains("spark.executor.memory")) { //判断单个节点的memory是否小于最小内存,小于的话,招出异常 11 val executorMemory = conf.getSizeAsBytes("spark.executor.memory") 12 if (executorMemory < MIN_MEMORY_BYTES) { 13 throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + 14 s"$MIN_MEMORY_BYTES. Please increase executor memory using the " + 15 s"--executor-memory option or spark.executor.memory in Spark configuration.") 16 } 17 } 18 val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) //取系数0.2 19 val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) //取系数0.8 20 (systemMaxMemory * memoryFraction * safetyFraction).toLong //最大可用的执行内存=0.2*0.8*系统可用最大内存 21 } 22
1默认最大也就0.16\*系统最大内存 2

从上面来看的话,0.54+0.16 = 0.7,还有0.3去哪儿了?最大执行内存和存储内存都有预留一部分,以防止直接OOM。还有0.1是用来放一些中间文件,如.class文件,执行过程中生成的一些中间文件等。StaticMemoryManager这个类还是比较简单。

然后上面的两个方法在StaticMemoryManager这个类实例化是会被调用

1 def this(conf: SparkConf, numCores: Int) { 2 this( 3 conf, 4 StaticMemoryManager.getMaxExecutionMemory(conf), //获取最在执行内存 5 StaticMemoryManager.getMaxStorageMemory(conf), //获取最大存储内存 6 numCores) 7 } 8

3、UnifiedMemoryManager

UnifiedMemoryManager下同是一个伴生对象,所以在1中,可以不用new关键字。

下面具体看看

1object UnifiedMemoryManager { 2 3 private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 4 5 def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { 6 val maxMemory = getMaxMemory(conf) 7 new UnifiedMemoryManager( 8 conf, 9 maxHeapMemory = maxMemory, 10 onHeapStorageRegionSize = 11 (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, 12 numCores = numCores) 13 } 14 15 //存储和执行内存的最大安全可用大小 16 private def getMaxMemory(conf: SparkConf): Long = { 17 val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)//默认取最大可用内存 18 val reservedMemory = conf.getLong("spark.testing.reservedMemory", 19 if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) //系统预留内存,如有有设置spark.testing字段那么reservedMemory =0 20 val minSystemMemory = (reservedMemory * 1.5).ceil.toLong //系统需要的最小内存 21 if (systemMemory < minSystemMemory) { //当可用的内存小于系统需要的最小内存,就抛出异常 22 throw new IllegalArgumentException(s"System memory $systemMemory must " + 23 s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + 24 s"option or spark.driver.memory in Spark configuration.") 25 } 26= 27 if (conf.contains("spark.executor.memory")) { //判断单个节点设置的内存是否小于需要的最小内存 28 val executorMemory = conf.getSizeAsBytes("spark.executor.memory") 29 if (executorMemory < minSystemMemory) { 30 throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + 31 s"$minSystemMemory. Please increase executor memory using the " + 32 s"--executor-memory option or spark.executor.memory in Spark configuration.") 33 } 34 } 35 val usableMemory = systemMemory - reservedMemory //存储和执行内存的可用内存 36 val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)//存储和执行内存的安全系数 37 (usableMemory * memoryFraction).toLong //存储和执行内存的最大安全可用内存 = 0.6 * (系统可用内存-预留内存) 38 } 39} 40 41
1 从这里可以看到,已经没有严格区分存储和执行内存的大小,而是统一为两者申请。所以存储和执行共享内存=0.6\*(系统可用内存-系统预留内存) 2

参考文章:

http://blog.csdn.net/wisgood/article/details/51436321

代码交流 2021