Flink源代码阅读总结
2017-11-21 17:07:09    1160    0    0
litm7@qq.com

Flink源代码阅读总结

 

第一步:(一周时间)阅读了flink的http://flink.apache.org/  网站上的所有文档,英文的理解了一个大概,有一个直接的架构印象。找到相关模块的中文资料和架构图,重新看别人理解的这些英文文档的和翻译的文档对照一下理解的差异。此处已经形成了整体的关键一些知识的影响。

第二步:(1周时间)搭建环境,在本地运行local模式的flink然后运行client端提交一个example的例子,看执行的过程。阅读client相关的源代码,看client形成一个job提交到后台local的jobmanager到taskmanager的过程。

第三部:(1周时间)跟进到某个example的main函数的例子中,看一个flink的运行的处理代码是如何被解析成streamgraph,到jobgraph最终被提交到jobmanager执行的。

我到这里基本上就陷入到从streamgraph到jobgraph的编译的过程中的代码海洋里面,debug了一段时间跟踪整个过程,看jobgraph对象形成的过程,有一个初步的印象,理解了一个大概。

 

按到道理接下来应该是继续看jobmanager以及taskmanager,但是上面的代码还没有理解透,几句没有贸然向下走了。

重新定义阅读的目标:把形成jobgraph的过程中的类之间的关系细节看清楚。于是准备单独构造一个新的代码crazystreamclient逐步从client到加载jar运行,这个代码够了2周时间,发现最后也陷入到代码的海洋里面了。接下来准备整理一下之前的收获的概念,以及把类关系整理出来,便于记忆和继续演绎理解,再进行归纳理解。

好吧,基本概念我就跳过去,直接从代码的关键类和类关系进行梳理,有时间再把概念整理补上。

 

1.0  将example中的env.execute中执行F7进去后的环境类StreamExecutionEnvironment

    首先看一下这个类所在的flink源代码的目录:flink-streamming-java/src/main/java/org/apache/flink/steamming/api/environment/

 

              这个目录下一共放了8个class,这个目录中的8个class的类关系图如下:说明StreamExecutionEnvironment是几个Environment的父亲类;还有一个Env的工厂类接口。

       

     

下面是类StreamExcutionEnvironment的12个成员变量:具体说明如下:

改对象一共有12个成员变量和5组类型的成员方法;提供执行入口,以及创建source源,返回DataStreamSource对象让执行继续迭代。

 

下面继续说明StreamExcutionEnvironment的成员方法:说明:

 

 

 

下面看一下StreamExcutionEnvironment中的成员类ExecutionConfig类:

目前理解改成员主要设置配置序列化的类和类型、并行度、代码分析模式开关、全局的job参数

其中ExecutionMode是一个枚举类型,一共有4种模式:PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED(具体这四种执行模式的含义后面再深入分析,此处做一个标记????????)

其中CodeAnalysisMode也是一个枚举类型,一共有3种模式:DISABLE(关闭代码分析模式)、HINT(将分析结果打印出来)、OPTIMIZE(启动代码优化)

其中RestartStrategies.RestartStrategyConfiguration是一个重启策略的嵌套类,在该子类中也嵌套了子类的3实现类。RestartStrategyConfigurations是用于与核心模块解耦的设计模式,在该类中定义了3个重启策略子类。比如失败重启、修复重启、没有重启、对于修复重启fixedDelayRestart,则在实现子类中设置了重启的尝试次数,以及每次重启的时间间隔。

其中GlobalJobParameters是一个ExecutionConfig类的内部嵌套类,它是一个用户KV配置全局参数的基类,其重要的一个子类是Configuration,这个子类的子类是:DelegatingConfiguration 和UnmodifiableConfiguration;这些子类都会有一个成员变量protected final HashMap<String, Object> confData;,这个KV结构将存储全局的Job参数配置。

 

继续分析这个ExecutionConfig类中的6个private的HashMap

先看一下LinkedHashMap(HashMap子类,按照插入的顺序遍历,在数据量大而且稀疏的时候比HashMap遍历起来要快。参考)中的Value的两个类:SerializableSerializer   和  Serializer

    SerializableSerializer 是一个ExecutionConfig的内部嵌套类:

   下面是这个类的定义,创建这个对象会存储一个外部类型的T的序列化对象,并且提供了Get函数。    

     Serializer<T> 类是一个三方包kryo-2.24.0.jar通过maven引入进来的,是一个序列化对象的抽象类,具体的序列化对象可以实现它。

通过类的calss作为KEY来查找具体的序列化对象。

 

下面继续分析这个CheckpointConfig类,这个类也是StreamExcutionEnvironment的成员类,是flink 的Chekpoint相关的配置参数的类

这个类里面主要的成员就2个枚举成员enum,一个是CheckpointingMode,这个是设置checkpoint的运行模式,缺省是恰好一次。另外一个类是一个扩展的checkpoint的清除类。

 

 

下面继续分析StreamExcutionEnvironment成员的另外一个重要成员变量:List<StreamTransformation>

 

 StreamTransformation 是一个基础类,派生了Source,SideOutput,OneInput,Sink,TwoInput,Split,Feedback,Partition,Select,Union,CoFeedback

该基础类实现了一些get或者Set方法,有2个方法是需要继承者的子类自己实现的

public abstract void setChainingStrategy(ChainingStrategy strategy);

public abstract Collection<StreamTransformation<?>> getTransitivePredecessors();

 

 

 接下来在继续研究private AbstractStateBackend defaultStateBackend;  这个成员变量:

这个是一个集成了接口StateBackend的虚拟类,

简单的看了一些这个虚类,然后找了几个继承它实现的子类看了一下,感觉稍微有点晕,还没有彻底看明白。只是凭猜测是实现了一套将checkpoint的状态序列化到不同的存储上的能力,以及恢复的能力。

先看看接口StateBackend这个接口吧:

1、这个接口继承自java的序列化接口,可以序列化,

2、拥有4个方法


/** * Creates a {@link CheckpointStreamFactory} that can be used to create streams 
*     that should end up in a checkpoint.       创建一个CheckpointStreamFactory通常用在一个检查点上创建streams流,
* *     @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams. 这个我们创建checkpoint的streams流的jobid
*       @param operatorIdentifier An identifier of the operator for which we create streams. 一个我们创建流的操作id
*/
//这个方法通过2个参数创建了一个检查点的流创建工厂类,为了后续创建streams流
CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException; 

 

 


/** * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> * and checkpointing it. 
*  * <p><i>Keyed State</i> is state where each value is bound to a key. 
*  * @param env 
*  * @param jobID 
*  * @param operatorIdentifier 
*  * @param keySerializer 
*  * @param numberOfKeyGroups 
*  * @param keyGroupRange 
*  * @param kvStateRegistry 
*  * @param <K> The type of the keys by which the state is organized. 
*  * @return The Keyed State Backend for the given job, operator, and key group range. 
*  * @throws Exception This method may forward all exceptions that occur while instantiating the backend. */
<K> AbstractKeyedStateBackend<K> 
//这个方法创建了一个保存keyed State和Checkpoint的类
createKeyedStateBackend(      Environment env,                                      JobID jobID,                                      String operatorIdentifier,                                      TypeSerializer<K> keySerializer,                                      int numberOfKeyGroups,                                      KeyGroupRange keyGroupRange,                                      TaskKvStateRegistry kvStateRegistry) throws Exception; 

 

 


/** * Creates a new {@link OperatorStateBackend} that can be used for storing operator state. 
*  * <p>Operator state is state that is associated with parallel operator (or function) instances, 
* rather than with keys. 
*  * @param env The runtime environment of the executing task. 
* @param operatorIdentifier The identifier of the operator whose state should be stored. 
*  * @return The OperatorStateBackend for operator identified by the job and operator identifier. 
*  * @throws Exception This method may forward all exceptions that occur while instantiating the backend. */

//创建了一个用来存储的operator状态。Operator state是一个和并行运算符相关的状态。
OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception; 

 

 

3、看一下英文的对这个接口的注释,我翻译一下

/** * A State Backend defines how the state of a streaming application is stored and * checkpointed. Different State Backends store their state in different fashions, and use * different data structures to hold the state of a running application. * *

一个状态后端定义了如何存储和checkpoint流应用程序的状态。不同的状态后端有不同的存储方式,用不同的数据接口保持运行应用程序的状态。

For example, the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend} * keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the * JobManager. The backend is lightweight and without additional dependencies, but not highly available * and supports only small state. * *

例如,这个MemoryStateBackend这个子类保存工作的状态在TsakManager的内存中,和存储checkpoint在JobManager的内存中。这个后端是轻量级和不需要附加依赖的,并且不是高可用的,仅仅支持小状态存储。

The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend} * keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem * (typically a replicated highly-available filesystem, like HDFS, * Ceph, S3, * GCS, etc). * *

这个FsSatteBackend的子类保存工作状态在TaskManager的内存中,并且存储checkpoint状态在文件系统(典型的高可用文件系统像HDF,Ceph,S3

,GCS 等)中。

The {@code RocksDBStateBackend} stores working state in RocksDB, * and checkpoints the state by default to a filesystem (similar to the {@code FsStateBackend}). * *

这个RocksDBStateBackedn 存储工作状态在RocksDB中,并且checkpoints状态存储在文件系统中类似fsStateBackend

 

Raw Bytes Storage and Backends

* * The {@code StateBackend} creates services for raw bytes storage and for keyed state * and operator state. * *

这个StateBackend为原始字节,关键状态,操作状态创建服务。

The raw bytes storage (through the {@link CheckpointStreamFactory}) is the fundamental * service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager * to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state * backends to store checkpointed state. * *

这个原始字节存储(CheckpointStreamFactory)是一个以简单容错方式提供的基本服务.JobManager使用此服务来存储检查点和恢复元数据,通常也由密钥和操作员状态后端使用来存储检查点状态。

The {@link AbstractKeyedStateBackend} and {@link OperatorStateBackend} created by this state * backend define how to hold the working state for keys and operators.  通过Factory对2个抽象的类AbstrractKeyedStateBackend和OperatorStateBackend的子类进行创建,他们2个来保存这个keys和operators的工作状态。

They also define how to checkpoint * that state, frequently using the raw bytes storage (via the {@code CheckpointStreamFactory}). * However, it is also possible that for example a keyed state backend simply implements the bridge to * a key/value store, and that it does not need to store anything in the raw byte storage upon a * checkpoint. * *

他们也定义了怎样处理检查点的状态,通常是用原始存储通过CheckpointStreamFactor来完成。然而,它是它也是可能例如一个key后端状态简单实例化桥接到一个kv存储上,它并不需要存储任何事情到原始存储上去。

 

 

Serializability

串行化

* * State Backends need to be {@link java.io.Serializable serializable}, because they distributed * across parallel processes (for distributed execution) together with the streaming application code. * *

状态后端需要序列化,因为他们和流计算程序并行执行在一起执行。

Because of that, {@code StateBackend} implementations (typically subclasses * of {@link AbstractStateBackend}) are meant to be like factories that create the proper * states stores that provide access to the persistent storage and hold the keyed- and operator * state data structures. That way, the State Backend can be very lightweight (contain only * configurations) which makes it easier to be serializable. * * *

因此,{@code StateBackend}实现(通常是{@link AbstractStateBackend}的子类)意味着像创建适当的状态存储的工厂,提供对持久存储的访问并保持key和运算符状态 数据结构。 这样,状态后端可以非常轻量级(仅包含配置),这使得它更易于串行化。

Thread Safety

线程安全

* * State backend implementations have to be thread-safe. Multiple threads may be creating * streams and keyed-/operator state backends concurrently. */

状态后端实现必须是线程安全的。多个线程可能同时创建流和key/operator状态后端。

 

上面可以看出stateBackend是定义了4个接口方法,由子类去实现,完成相应的factory或者状态或者key的保存,下面继续分析一下其中一个虚基类AbstractStateBackend的实现。

 


这个AbstractStateBackend类比其接口多实现了2个静态方法,目的是为了通过config来加载具体的子类。

 


public static StateBackend loadStateBackendFromConfig(      Configuration config,      ClassLoader classLoader,      @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

这个方法根据config文件中的配置来switch case的方式选择加载MemoryStateBackend、FsStateBackend、RocksDBStateBackendFactory,这3个类都是其子类,第一个是内存保存StateBackedn,第二个是文件系统保存,第三个是通过RocksDB来保存

 

这个3个展开又要有不少细节可以看了,慢慢来:

先从MemoryStateBackend来解剖吧:

    1、在解析这个类前还要把其中用到的几个类深入解析一下MemCheckpointStreamFactory 继承自接口CheckpointStreamFactory

          这个类{@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. 生成写入内存中字节数组的流。

          这个类factory最重要的方法是createCheckpointStateOutputStream,这个方法返回值是一个CheckpointStateOutputStream类,里面new

一个它的子类MemoryCheckpointOutputStream

           这个子类是MemCheckpointStreamFactory的内部公开静态嵌套类。这个内部嵌套类中最重要的成员对象是ByteArrayOutputStreamWithPos

 

         这个ByteArrayOutputStreamWithPos继承自OutputStream,这个Outputstream是一个经常在各个类出现的父类,对应的还有一个InputStream类。这个类是所有输出字节流的父亲类,可以接收一个输出bytes并且输出到sink上去。应用程序需要定义一个OutputStream的子类并且至少实现一个write写字节的方法。这个虚拟类3个write方法,其中一个是底层的write方法需要子类去实现,另外2个方法一个是flush和close方法,在这里他们2个都是空实现。

         ByteArrayOutputStreamWithPos是Un-synchronized stream similar to Java's ByteArrayOutputStream that also exposes the current position.不同步的流类似于java的ByteArraOutputStream,需要当前的位置。这个类里面为了2个成员变量,就是内存字节数组和count值。

     

 

           ByteArrayOutputStreamWithPos这个类主要比父类多实现了如何自动扩大这个buffer的大小,以及write具体写入buffer的方法。而MemoryCheckpointOutputStream类则完成了对ByteArrayOutputStreamWithPos的封装。其中涉及到2个新类,具体截图方法如下:

         

                  ByteStreamStateHandle是StreamStateHandle的子类。这个StreamStateHandle是一个接口,声明了一个方法FSDataInputStream openInputStream() throws IOException;,这个方法 打开一个输入stream流返回一个FSDataInputStream对象,这个对象是InputStream的子类。这个InputStream和上面提到的OutputStream是一对,不过这个类是完成read方法,这个类有必须有一个虚方法需要实现,这个read读下一个byte的数据从input stream中。这个返回一个int。当然该类中还实现了其他几个方法,例如skip,close,reset等

 

 

            在其子类FSDataInputStream中又多定义了2个抽象方法seek和getPos,这是需要其子类实现的,分别是seek方法是根据传入的int的位置将读的指针位置移动向buffer的指定位置,在接下来read的时候从这个地方开始。getPos是返回当前阅读开始的位置。

            ByteStreamStateHandle类继承自StreamStateHandle,并且内部嵌套实现了一个FSDataInputStream的子类ByteStateHandleInputStream,这个内部嵌套类实现了seek,getPos,read方法。ByteStreamStateHandle类中定义了需要存储的数据的成员变量protected final byte[] data; 状态state数据通过这个byte数组来存储起来。ByteStreamStateHandle还有一个成员变量handleName,通过这个handleName来判断两个ByteStreamStateHandle是相同。

            继续返回说这个MemCheckpointStreamFactory工程类,创建它返回他父类的句柄CheckpointStreamFactory,这个类可以调用它的createCheckpointStateOutputStream() 方法返回一个CheckpointStateOutputStream类,这个类可以操作内存里面byte数组来完成对state的状态的保存提取等操作。

          2、下面接着分析另外一个相关的类DefaultOperatorStateBackend(Default implementation of OperatorStateStore that provides the ability to make snapshots,缺省的操作状态存储类提供了可以做快照的能力),这个类也是MemoryStateBackend中其中一个方法创建出来的类,其继承自接口OperatorStateBackend;而这个接口又继承自其他3个接口:OperatorStateStore,Snapshotable<OperatorStateHandle>,Closeable,这些接口都定义了一些方法比如:getListState,getUnionListState,getOperatorState,getSerializableListState  和snapshot,restore,和close 这些接口方法。而DefaultOperatorStateBackend类定义了几个重要的成员数据:           

           (1)、第一个成员private final Map<String, PartitionableListState<?>> registeredStates; 这个是通过name来获得一个分区列表的状态字段

             其中PartitionableListState是一个内部嵌套类继承自接口 ListState;这个类中private final TypeSerializer<S>  partitionStateSerializer; 

              保存了类型序列化的对象。下面就是TypeSerializer其子类的对象列表。可以看到很多类型都有自己的序列化对象。

                

             以其子类StringSerializer 为例,其继承自TypeSerializerSingleton,而这个父类是TypeSerializer在这个StringSerializer中实现了。

            在StringSerializer中实现了一些String的序列化和反序列化的方法,将String recored写入 DataOutputView中,其中使用到了StringValue类,这个应该是flink重写了String方法将数组模拟成String的方法来调用。StringValue中有一个重要的数据成员private char[] value; 在这个StringValue上的方法都是在操作这个value类,当然也有一些static方法,比如writeString就是操作参数中的数据,将String对象调用DataOutputView方法写入到target中去,反序列化正好想法,把DataInputView中的数据先读长度,在逐一字节读取后转换成String类型返回。其他Double,Byte等开头的Serializer都是类似做法,只是实际完成了这些序列化的方法。而参数中的DataOutputView 和DataInputView的interterface都分别继承自接口DataOutput和DataInput,可以read或者write具体的数据,实现他们的子类很多比如:AbstractPagedOutputView,HashPartition<BT, PT>,ChannelWriterOutputView,DataOutputSerializer,DataOutputViewStreamWrapper,FileChannelOutputView等等

                  

             继续类DefaultOperatorStateBackend的第二个成员变量。直接翻译英文是CloseableRegistry参与到task的生命周期中。我看了一下这个class是一个父类,这个应该是在Operator State backend的时候关闭task,解决任务安全关闭问题的类。

         这个类实现了父类的2个方法doRegister  和 doUnRegister,doRegister意识注册关闭closeable的object,doUnRegister是将关闭的object移除出regitry(map),其父类是AbstractCloseableRegistry,这个类的英文注释截图如下:

          这是一个抽象类允许哪些想要关闭的实例去注册确保最终关闭。所有操作都是线程安全的。

             这个抽象的类,主要是实现了线程安全的关闭Closeable对象的能力。除了CloseableRegistry继承自这个虚基类外,还有一个SafetyNetCloseableRegistry也继承自它。并且实现了网络的安全关闭能力;这个类内部有2个嵌套类,通过另外的线程来不断尝试安全关闭。如果仔细看 DefaultOperatorStateBackend这个类,使用的成员变量private final CloseableRegistry closeStreamOnCancelRegistry;的地方,都是打开了FSDataInputStream这些资源,需要确保能够有效的线程安全的关闭这些资源的句柄。

             

             记下来继续分析 DefaultOperatorStateBackend 类的另外一个成员变量private final JavaSerializer javaSerializer;看它的英文解释是缺省的序列化对象。仅仅用在缺省的操作状态获取的时候。

            

            还有一个成员变量是 ClassLoader,记录这次状态备份的线程需要用到的classloader的对象,在load state的对象的时候需要创建出来class对象

   

                             见类中对于userClassloader使用的地方,就是在设置当前线程的classloader,然后后面读取序列化的对象后就用这个加载class

 

              接下来继续分析 另外一个成员变量ExecutionConfig,这个类在之前的类中也有分析过,StreamExcutionEnvironment中的成员类ExecutionConfig类,可以回过去翻看上面。

 

            

                在DefaultOperatorStateBackend类中有一个内部嵌套的静态类 PartitionableListState,

 

 

             这个嵌套类实现了2个重要的方法getListState 和 deserializeStateValues 这些都是内部类的私有方法,供DefaultOperatorStateBackend类调用和封装,完成获得状态列表和反序列化状态值的一些操作。具体细节读取的数据需要debug的时候再补充进来。

            好的我们最后回顾一下这个类DefaultOperatorStateBackend,我们是通过MemoryStateBackend的createOperatorStateBackend方法new出来的,然后通过理解这段代码找到的这个类。

 

         在MemoryStateBackend 中还有一个方法createStreamFactory,里面创建了一个新的对象:MemCheckpointStreamFactory,这个对象是一个工程类,里面有一个嵌套的子类MemoryCheckpointOutputStream,用于内存Chekpoint流的写入,同步和关闭等操作。


              

 

 

Pre: Rxjava2的源代码解析(1)

Next: 给女儿的一封信

1160
Sign in to leave a comment.
No Leanote account? Sign up now.
0 comments
Table of content