目前,大数据计算引擎主要用 Java 或是基于 JVM 的编程语言实现的,例如 Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处在于程序员不需要太关注底层内存资源的管理,但同样会面临一个问题,就是如何在内存中存储大量的数据(包括缓存和高效处理)。Flink使用自主的内存管理,来避免这个问题。
JVM内存管理的不足:
1)Java 对象存储密度低。Java的对象在内存中存储包含3个主要部分:对象头、实例数据、对齐填充部分。例如,一个只包含 boolean 属性的对象占16byte:对象头占8byte,boolean 属性占1byte,为了对齐达到8的倍数额外占7byte。而实际上只需要一个bit(1/8字节)就够了。
2)Full GC 会极大地影响性能。尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC 会达到秒级甚至分钟级。
3)OOM 问题影响稳定性。OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的健壮性和性能都会受到影响。
4)缓存未命中问题。CPU进行计算的时候,是从CPU缓存中获取数据。现代体系的CPU会有多级缓存,而加载的时候是以Cache Line为单位加载。如果能够将对象连续存储,这样就会大大降低CacheMiss。使得CPU集中处理业务,而不是空转。(Java对象在堆上存储的时候并不是连续的,所以从内存中读取Java对象时,缓存的邻近的内存区域的数据往往不是CPU下一步计算所需要的,这就是缓存未命中。此时CPU需要空转等待从内存中重新读取数据。
Flink 并不是将大量对象存在堆内存上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做 MemorySegment,它代表了一段固定长度的内存(默认大小为 32KB),也是 Flink中最小的内存分配单元,并且提供了非常高效的读写方法,很多运算可以直接操作二进制数据,不需要反序列化即可执行。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。
内存模型1、 JobManager内存模型
在1.10中,Flink 统一了 TM 端的内存管理和配置,相应的在1.11中,Flink 进一步对JM 端的内存配置进行了修改,使它的选项和配置方式与TM 端的配置方式保持一致。
2、 TaskManager内存模型
Flink 1.10 对TaskManager的内存模型和Flink应用程序的配置选项进行了重大更改,让用户能够更加严格地控制其内存开销。
TaskExecutorFlinkMemory.java
JVM Heap:JVM堆上内存(1)Framework HeapMemory:Flink框架本身使用的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源中。
配置参数:taskmanager.memory.framework.heap.size=128MB,默认128MB
(2)Task Heap Memory:Task执行用户代码时所使用的堆上内存。
配置参数:taskmanager.memory.task.heap.size
Off-Heap Mempry:JVM堆外内存(1)DirectMemory:JVM直接内存
① FrameworkOff-Heap Memory:Flink框架本身所使用的内存,即TaskManager本身所占用的对外内存,不计入Slot资源。
配置参数:taskmanager.memory.framework.off-heap.size=128MB,默认128MB
②Task Off-HeapMemory:Task执行用户代码所使用的对外内存。
配置参数:taskmanager.memory.task.off-heap.size=0,默认0
③Network Memory:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
配置参数:
taskmanager.memory.network.fraction:0.1
taskmanager.memory.network.min:64mb
taskmanager.memory.network.max:1gb
(2)Managed Memory:Flink管理的堆外内存,用于排序、哈希表、缓存中间结果及 RocksDB StateBackend 的本地内存。
配置参数:
taskmanager.memory.managed.fraction=0.4
taskmanager.memory.managed.size
JVM specific memory:JVM本身使用的内存(1)JVM metaspace:JVM元空间
(2)JVM over-head执行开销:JVM执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
配置参数:
taskmanager.memory.jvm-overhead.min=192mb
taskmanager.memory.jvm-overhead.max=1gb
taskmanager.memory.jvm-overhead.fraction=0.1
总体内存(1)总进程内存:Flink Java应用程序(包括用户代码)和JVM运行整个进程所消耗的总内存。
总进程内存=Flink使用内存 + JVM元空间 + JVM执行开销
配置项:taskmanager.memory.process.size:1728m
(2)Flink总内存:仅Flink Java应用程序消耗的内存,包括用户代码,但不包括JVM为其运行而分配的内存
Flink使用内存:框架堆内外 + task堆内外 + network + manage
配置项:taskmanager.memory.flink.size:1280m
说明:配置项详细信息查看如下链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration
3、 内存分配
(1)JobManager内存分配
YarnClusterDescriptor.java
JobManagerProcessUtils.java
ProcessMemoryUtils.java
JobManagerFlinkMemoryUtils.java
2、TaskManager内存分配
ActiveResourceManager.java
TaskExecutorProcessUtils.java
内存数据结构内存段内存段在 Flink 内部叫 MemorySegment,是 Flink 中最小的内存分配单元,默认大小32KB。它即可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
HeapMemorySegment:用来分配堆上内存
HybridMemorySegment:用来分配堆外内存和堆上内存,2017年以后的版本实际上只使用了HybridMemorySegment。
如下图展示一个内嵌型的Tuple3
可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占4字节,double 占8字节,POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每个字段对应的serializer对字段进行序列化。
内存页内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。使用时就无需关心MemorySegment的细节,会自动处理跨MemorySegment的读取和写入。
BufferTask算子之间在网络层面上传输数据,使用的是Buffer,申请和释放由Flink自行管理,实现类为NetworkBuffer。1个NetworkBuffer包装了1个MemorySegment。同时继承了AbstractReferenceCountedByteBuf,是Netty中的抽象类。
Buffer资源池BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。
BufferPoolFactory用来提供BufferPool的创建和销毁,唯一的实现类是NetworkBufferPool,每个TaskManager只有一个NetworkBufferPool。同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候创建并分配内存。
内存管理器MemoryManager用来管理Flink中用于排序、Hash表、中间结果的缓存或使用堆外内存的状态后端(RocksDB)的内存。
1.10之前版本,负责TaskManager所有内存。
1.10版本开始,管理范围是Slot级别。
堆外内存资源申请:MemoryManager.java
MemorySegmentFactory.java
RocksDB自己负责内存申请和释放RocksDBOperationUtils.java
MemoryManager.java
网络传输中的内存管理网络上传输的数据会写到 Task 的 InputGate(IG)中,经过 Task 的处理后,再由 Task 写到 ResultPartition(RS)中。每个 Task 都包括了输入和输入,输入和输出的数据存在 Buffer 中(都是字节数据)。Buffer 是 MemorySegment 的包装类。
1)TaskManager(TM)在启动时,会先初始化NetworkEnvironment对象,TM 中所有与网络相关的东西都由该类来管理(如 Netty 连接),其中就包括NetworkBufferPool。根据配置,Flink 会在NetworkBufferPool 中生成一定数量(默认2048)的内存块MemorySegment(关于 Flink 的内存管理,后续文章会详细谈到),内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共享的,每个 TM 只会实例化一个。
2)Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task 的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中 InputChannel数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition 数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。注意,这个过程只是指定了缓冲池所能使用的内存块数量,并没有真正分配内存块,只有当需要时才分配。为什么要动态地为缓冲池扩容呢?因为内存越多,意味着系统可以更轻松地应对瞬时压力(如GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的内存块。
3)在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中,InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的②)并交给 InputChannel 填上数据(上图中的③和④)。如果缓冲池已申请的数量达到上限了呢?或者 NetworkBufferPool 也没有可用内存块了呢?这时候,Task 的 NettyChannel 会暂停读取,上游的发送端会立即响应停止发送,拓扑会进入反压状态。当 Task 线程写数据到ResultPartition 时,也会向缓冲池请求内存块,如果没有可用内存块时,会阻塞在请求内存块的地方,达到暂停写入的目的。
4)当一个内存块被消费完成之后(在输入端是指内存块中的字节被反序列化成对象了,在输出端是指内存块中的字节写入到 Netty Channel 了),会调用 Buffer.recycle() 方法,会将内存块还给 LocalBufferPool (上图中的⑤)。如果LocalBufferPool中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注册的 Task 导致该池子容量变小),则LocalBufferPool会将该内存块回收给 NetworkBufferPool(上图中的⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销。
反压的过程1)记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)
2)记录被序列化到buffer 中。
3)该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。
记录能被 Flink 处理的前提是:必须有空闲可用的 Buffer。
结合上面两张图看:Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池2)。如果缓冲池1中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化并发送该buffer。
注意两个场景:
1)本地传输:如果 Task1 和 Task 2 运行在同一个 worker 节点(TaskManager),该 buffer 可以直接交给下一个 Task。一旦 Task 2 消费了该 buffer,则该 buffer 会被缓冲池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就会赶不上 Task 1 取 buffer 的速度,导致缓冲池1无可用的 buffer,Task 1 等待在可用的 buffer 上。最终形成 Task 1 的降速。
2)远程传输:如果 Task1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据(后面会说)。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求 buffer,阻塞了 writer 往 ResultSubPartition 写数据。
这种固定大小缓冲池就像阻塞队列一样,保证了 Flink 有一套健壮的反压机制,使得 Task 生产数据的速度不会快于消费的速度。我们上面描述的这个方案可以从两个Task 之间的数据传输自然地扩展到更复杂的 pipeline 中,保证反压机制可以扩散到整个 pipeline。