shuffle原理和内存溢出原因

错误异常:Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher

Redue Shuffle过程及参数:

先附上一个MR整个过程的大图:

过程:

1.EventFetcher 负责向MRAppMaster获取已经运行完的Map信息,这些信息包括Map编号和运行Map的服务器。
2.ShuffleScheduler负责调度Shuffle任务。
3.各Fetcher线程从ShuffleScheduler取任务,进行实际Map数据获取。默认5个Fetcher线程 。

参数:

    mapreduce.reduce.shuffle.input.buffer.percent:
shuffle使用的内存比例,默认是0.7。Shuffle内存为总内存 * 0.7。
mapreduce.reduce.shuffle.memory.limit.percent: 
单个shuffle任务能使用的内存限额,默认是0.25,即为 Shuffle内存 * 0.25。
低于此值可以输出到内存,否则输出到磁盘。
mapreduce.reduce.shuffle.merge.percent:默认值为0.9。
shuffle的数据量到Shuffle内存 * 0.9的时候,启动合并。

他的过程是最多5个Fetcher线程取拿取maptask输出的数据,Fetcher 线程获取Map的ShuffleHead信息后,通过调用merger.reserve(mapId, decompressedLength, id); 。merge返回InMemoryMapOutput或者是OnDiskMapOutput对象, 如果返回null

1` ```` `
,则再从ShuffleScheduler取新的任务。

Fetcher线程取到数据后,进行mapOutput的commit操作,说明信息读结束,这个mapOutput可以和其他的mapOutput进行合并。内存空间分给Fetcher后,状态变为allocated,commit后变为committed,只有commit状态的内存可以merge,

如果前4个Fetcher已经使用了全部的shuffle内存的99%,第5个Fetcher取的数据接近单个shuffle任务能使用的内存限额。没有fetcher commit。这时会为第5个fetcher分配25%的内存。使分配内存达到shuffle内存的124%,内存溢出。
前面的4个fetcher已经使用shuffle内存的89,并已经commit。这时Merge不会启动。最后一个Fetcher的数据量接近单个shuffle任务能使用的内存限额。这时总shuffle使用量为114%,内存溢出。

MergeManager的reserve的处理如下:

public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
long requestedSize,
int fetcher
) throws IOException {
if (requestedSize > maxSingleShuffleLimit) {
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}

if (usedMemory > memoryLimit) {
return null;
}

return unconditionalReserve(mapId, requestedSize, true);
}
unconditionalReserve方法如下: 增加usedMemory,返回InMemoryMapOutput对象。

 private synchronized InMemoryMapOutput<K, V> unconditionalReserve(
TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
usedMemory += requestedSize;
return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,
codec, primaryMapOutput);
}
此代码问题如下:

如果前4个Fetcher已经使用了全部的shuffle内存的99%,第5个Fetcher取的数据接近单个shuffle任务能使用的内存限额。没有fetcher commit。这时会为第5个fetcher分配25%的内存。使分配内存达到shuffle内存的124%,内存溢出。
前面的4个fetcher已经使用shuffle内存的89,并已经commit。这时Merge不会启动。最后一个Fetcher的数据量接近单个shuffle任务能使用的内存限额。这时总shuffle使用量为114%,内存溢出。
为了解决此问题,需要进行调整。目标是Shuffle内存占用总内存的比例不能超过70%,否则会出现OutOfMemoryError

方案一
保持shuffle内存0.7不变,则commit内存改为 0.75。同时修改reserve程序。
在reserve方法,如果usedMemory 小于Shuffle内存的75%总是能分配成功。
当大于75%的时候,则可能成功,也可能失败。但是当已经分配75%的时候,当这些Fetcher的任务结束commit 内存时,总能触发merge操作。merge后会释放内存。
为了提高系统效率,可以设置mapreduce.reduce.shuffle.merge.percent 为0.5,commit内存到0.5的时候,则启动merge,这时和fetcher申请内存时冲突的机会降低。即便 Reduce为2G内存,则merge时的数据量最少为: 2G * 0.7 * 0.5 为700MB。如果 Reduce 增大,则一次Merge的数量量更多。

    mapreduce.reduce.shuffle.input.buffer.percent:
shuffle使用的内存比例,设置为0.7。
mapreduce.reduce.shuffle.memory.limit.percent: 
单个shuffle任务能使用的内存限额,设置为0.25,即为 Shuffle内存 * 0.25。
低于此值可以输出到内存,否则输出到磁盘。
mapreduce.reduce.shuffle.merge.percent:设置为0.75。
shuffle的数据量到Shuffle内存 ** 0.75的时候,启动合并。

reserve方法改为

    if (usedMemory  + requestedSize > memoryLimit) {   // 原来为  if (usedMemory  > memoryLimit) {
return null;
}
方案二
shuffle内存 比例0.6,单个shuffle最大为0.15, 则merge的内存比例不用改,reserve方法不用改. 这种方案shuffle内存分配到接近100%时,最多可以分配15%的shuffle内存。总得Shuffle内存不超过0.6 + 0.6 * 0.15 = 0.69。
在reserve方法,如果usedMemory 小于Shuffle内存的100%总是能分配成功,否则失败。但是当已经分配100%的时候,当这些Fetcher的任务结束commit 内存时,总能触发merge操作。merge后会释放内存。

    mapreduce.reduce.shuffle.input.buffer.percent:
shuffle使用的内存比例0.6。
mapreduce.reduce.shuffle.memory.limit.percent: 
单个shuffle任务能使用的内存限额,设置为0.15,即为 Shuffle内存 * 0.15。
低于此值可以输出到内存,否则输出到磁盘。
mapreduce.reduce.shuffle.merge.percent:设置为0.9。
shuffle的数据量到Shuffle内存 ** 0.9的时候,启动合并。

代码交流 2021