大佬教程收集整理的这篇文章主要介绍了Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
基于Flink 1.12
Flink底层RPC是通过AKKA实现的,AKKA是基于Actor模型实现的框架。下面,将大致介绍一下actor模型。 在Actor模型中,一切事物都是actor,一个actor是一个基本的计算单元,每个actor是完全隔离的,不会共享内存,也就不会有共享数据带来的并发问题;它们是自己维护自身的状态,该状态不会被其他actor直接修改。 整体模型大致是:多个actor同时运行,每个actor接收消息,并根据消息做出相应的反应。消息本身是通过异步的形式发送给actor的,消息会被存储在一个叫做“邮箱(mailbox)”的地方,actor会顺序的处理收到的信息,避免锁的使用。从描述可以了解到actor模型中,消息的发送者和已发送消息解耦,是以并发的形式处理数据的。
RPC作用是让远程调用像本地调用,封装调用的细节。 Flink定义了各个组件的Gateway,通过回调的方式隐藏实现细节,将业务本身和通信解绑了,方便RPC调用。目前,Flink的RPC请求的底层通信是通过AKKA的实现的。
public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsyn{
//启动sever和获取RPC Gateway
/** RPC service to be used to start the RPC server and to obtain rpc gateways. */
private final Rpcservice rpcservice;
//RpcServer用于启动和连接到RpcEndpoint, 连接到rpc服务器将返回一个RpcGateway,为Rpcservice提供RPC服务/连接远程Server
/** Interface to access the underlying rpc server. */
protected final RpcServer rpcServer;
}
是可以发送心跳和请求心跳相应组件接口,是对具备心跳能力对象的一种抽象。 HeartbeatTarget的函数具备以下两种动作:
HeartbeatMonitor管理HeartbeatTarget的心跳状态。当在指定时间内未收到心跳信息时,monitor将会通知对应的HeartbeatListener,收到心跳信息后会重置其定时器。其工厂接口如下:
HeartbeatMonitor<O> createHeartbeatMonitor(
resourcEID resourcEID,
HeartbeatTarget<O> heartbeatTarget,
scheduledExecutor mainThreadExecutor,
HeartbeatListener<?, O> heartbeatListener, //用于处理心跳信息
long heartbeatTimeoutIntervalMs);
HeartbeatListener是和HeartbeatManager交互的接口,Flink的业务的处理逻辑需要继承该接口以处理心跳结果,其三个回调函数如下:
心跳的管理者,用于开始/停止对HeartbeatTarget的心跳监控,以及会处理某个节点的心跳超时。 HeartbeatManager继承了HeartbeatTarget,其具有了HeartbeatTarget的函数功能以外,该接口还有以下四种函数:
核心接口交互的大致过程:HeartbeatManager将HeartbeatTarget放入到监控列表中,当心跳超时时,HeartbeatMonitor回通知HeartbeatListener处理,通过对HeartbeatListener的实现,完成相关处理心跳超时的逻辑。
下面通过分析1.3.1中核心接口的实现类,来具体分析心跳处理的过程。
该manager维护了一个heartbeat 的监控对象(HeartbeatMonitor)和资源ID信息,当收到新的心跳信息是,monitor对象将会被更新;心跳超时时,将会通知HeartbeatListenter对象。
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
//心跳间隔
/** Heartbeat timeout interval in milli seconds. */
private final long heartbeatTimeoutIntervalMs;
//心跳
/** Heartbeat listener with which the heartbeat manager has been associated. */
private final HeartbeatListener<I, O> heartbeatListener;
//使用一个map存放资源-心跳的monitor信息,其monitorTarget方法就是将对应信息放入该map中
/** Map containing the heartbeat monitors associated with the respective resource ID. */
private final ConcurrentHashMap<resourcEID, HeartbeatMonitor<O>> heartbeatTargets;
/** Running state of the heartbeat manager. */
protected volatile Boolean stopped;
HearbeatManagerImpl实现的主要函数有:
继承于HearbeatManagerImpl,HeartbeatManagerSenderImpl向其监控的heartbeatTarget对象请求心跳的响应,属于主动触发心跳请求。实现了Runnable接口,在其run方法中,会遍历heartbeatMonitor,通过requestHeartbeat()方法向节点获取心跳信息。
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
@Override
public void run() {
if (!stopped) {
log.debug("trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}
// 周期性调度,事件周期可配
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
// 主动发起心跳请求
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
// 调用Target的 requestHeartbeat函数
heartbeatTarget.requestHeartbeat(getOwnresourcEID(), payload);
}
}
HeartbeatMonitor管理心跳目标,它在初始化会启动一个scheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
/** resource ID of the monitored heartbeat target. */
private final resourcEID resourcEID; //监控的资源ID
/** Associated heartbeat target. */
private final HeartbeatTarget<O> heartbeatTarget; //心跳目标
private final scheduledExecutor scheduledExecutor;
/** Listener which is notified about heartbeat timeouts. */
private final HeartbeatListener<?, ?> heartbeatListener;
HeartbeatMonitorImpl(
resourcEID resourcEID,
HeartbeatTarget<O> heartbeatTarget,
scheduledExecutor scheduledExecutor,
HeartbeatListener<?, O> heartbeatListener,
long heartbeatTimeoutIntervalMs) {
this.resourcEID = Preconditions.checkNotNull(resourcEID);
this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
Preconditions.checkArgument(
heartbeatTimeoutIntervalMs > 0L,
"The heartbeat timeout interval has to be larger than 0.");
this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
lastHeartbeat = 0L;
//初始化的时候,就启动一个定时任务
resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
}
@Override
public void run() {
// The heartbeat has timed out if we're in state running
if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
//通知heartbeatListener处理
heartbeatListener.notifyHeartbeatTimeout(resourcEID);
}
}
void resetHeartbeatTimeout(long heartbeatTimeout) {
if (state.get() == State.RUNNING) {
cancelTimeout();
//重新开启新的定时任务
futureTimeout =
scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
// Double check for concurrent accesses (e.g. a firing of the scheduled futurE)
if (state.get() != State.RUNNING) {
cancelTimeout();
}
}
}
}
Heartbeatservices为所有需要心跳服务的创建heartbeat receivers and heartbeat senders。
public class Heartbeatservices {
/**
* 创建 heartbeat receivers
* Creates a heartbeat manager which does not actively send heartbeats.
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
resourcEID resourcEID,
HeartbeatListener<I, O> heartbeatListener,
scheduledExecutor mainThreadExecutor,
Logger log) {
return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourcEID, heartbeatListener, mainThreadExecutor, log);
}
/**
* 创建 heartbeat sender
* Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
resourcEID resourcEID,
HeartbeatListener<I, O> heartbeatListener,
scheduledExecutor mainThreadExecutor,
Logger log) {
return new HeartbeatManagerSenderImpl<>(
heartbeaTinterval,
heartbeatTimeout,
resourcEID,
heartbeatListener,
mainThreadExecutor,
log);
}
// 从配置文件配置心跳间隔时间和心跳超时时间
//两者的关系 0 < 心跳间隔时间 < 心跳超时时间
public static Heartbeatservices fromConfiguration(Configuration configuration) {
long heartbeaTinterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
return new Heartbeatservices(heartbeaTinterval, heartbeatTimeout);
}
}
在一个Flink集群中只有一个resourceManager(RM),和一个或多个TaskManager(TM)。两者的交互过程为:TM启动时会向RM注册,注册成功之后,RM会主动要求TM上报心跳信息。通过RM和TM的心跳信息,双方知道对方是否存活。 在2.2.4小节总,我们知道HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。sender要对心跳目标上报心跳信息,receiver收到信息请求后返回一个response。
public abstract class resourceManager<WorkerType extends resourcEIDRetrievable>
extends FencedRpcEndpoint<resourceManagerId>
implements resourceManagerGateway, LeaderContender {
// RM启动时运行的方法
@Override
public final void onStart() throws Exception {
try {
// 启动RMservices
startresourceManagerservices();
} catch (Throwable t) {
final resourceManagerException exception =
new resourceManagerException(
String.format("Could not start the resourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
}
StandaloneLeaderElectionservice#start
|
resourceManager#grantLeadership
|
resourceManager#tryAcceptLeadership
|
resourceManager#startservicesOnLeadership //其具体实现如下
private void startservicesOnLeadership() {
//启动心跳服务
startHeartbeatservices();
//slotManager是RM中管理slot的组件,其具体过程后续博客分析
slotManager.start(getFencingToken(), getMainThreadExecutor(), new resourceActionsImpl());
//周期性判断是否存在未满足的slot请求
onLeadership();
}
启动心跳服务,就是创建分别创建了taskManagerHeartbeatManager和jobManagerHeartbeatManager用于RM和TM、RM和JM的心跳服务 private void startHeartbeatservices() {
taskManagerHeartbeatManager =
heartbeatservices.createHeartbeatManagerSender(
resourcEID,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatservices.createHeartbeatManagerSender(
resourcEID,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
结合2.2.2小节,RM在心跳服务在和TM与JM的心跳过程中,充当的是请求心跳请求的发起方,即RM是主动去拉取心跳信息的。
TaskExecutor在创建时,就初始化了心跳组件。
public TaskExecutor(
Rpcservice rpcservice,
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityservices haservices,
TaskManagerservices taskExecutorservices,
ExternalresourceInfoProvider externalresourceInfoProvider,
Heartbeatservices heartbeatservices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metriCQueryserviceAddress,
BlobCacheservice blobCacheservice,
FatalErrorHandler fatalErrorHandler,
TaskExecutorPartitionTracker partitionTracker,
BACkPressureSampleservice BACkPressureSampleservicE) {
//创建HeartbeatManagerImpl,对JM的心跳进行相应
this.jobManagerHeartbeatManager =
createJobManagerHeartbeatManager(heartbeatservices, resourcEID);
// 创建HeartbeatManagerImpl,对RM的心跳进行相应
this.resourceManagerHeartbeatManager =
createresourceManagerHeartbeatManager(heartbeatservices, resourcEID);
}
TaskExecutor#onStart
|
TaskExecutor#startTaskExecutorservices
|
StandaloneLeaderRetrievalservice#start //以standalone模式分析
|
|//在standalone模式下,已知晓JobManager的地址,会直接去链接RM
TaskExecutor.resourceManagerLeaderListener#notifyLeaderAddress
|
TaskExecutor#notifyOfNewresourceManagerLeader
|
TaskExecutor#reconnectToresourceManager
|
|//在该方法中会主动调用TaskExecutorToresourceManagerConnection类的start方法去链接RM
TaskExecutor#connectToresourceManager
|
| //在该函数的createNewRegistration方法中的回调函数,处理注册成功后的逻辑
RegisteredRpcConnection#start
|
|//z在该方法中会先链接RM,然后连接成功后发起注册请求
RetryingRegistration#startRegistration
|
RetryingRegistration#register
|
TaskExecutorToresourceManagerConnection#invokeRegistration
到此,TM向RM发起了注册,通过AKKA RPC,请求来到了RM中。
resourceManager#registerTaskExecutor
|
|// 该方法的返回值是RegistrationResponse,在该方法中会将调用taskManagerHeartbeatManager.monitorTarget,监控节点的心跳信息
resourceManager#registerTaskExecutorInternal
|
return new TaskExecutorRegistrationsuccess(
registration.geTinstancEID(), resourcEID, clusterInformation)
//注册成功后将会走start方法中createNewRegistration创建registration时的回调函数
RegisteredRpcConnection#start
|
TaskExecutorToresourceManagerConnection#onRegistrationsuccess
|
TaskExecutor#onRegistrationsuccess
|
| //和RM建立联系,开始监控RM
TaskExecutor#establishresourceManagerConnection
|
resourceManagerHeartbeatManager#monitorTarget
下面主要分析心跳交互过程。
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(
!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
//会在创建newRegistration时,定义链接成功后处理逻辑
final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
if (REGISTRATION_updatER.compareAndSet(this, null, newRegistration)) {
// 启动注册
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
在此过程中有好多回调,需要慢慢的体会。
由RM的初始化的分析,我们了解到,RM会主动要求TM上报心跳,其过程如下:
// 在该该方法中会创建一个HeartbeatManagerSenderImpl
resourceManager#startHeartbeatservices
|
| //这里会一步步调用构造函数中,在该构造函数中,会将心跳检查加入周期性任务列表中
HeartbeatManagerImpl
|
| //在任务启动时,会调用HeartbeatManagerSenderImpl的run方法,在该方法中会循环遍历HeartbeatMonitor,通过requestHeartbeat要求target上报心跳信息
HeartbeatManagerSenderImpl#run
|
| //该调用会跑到resourceManager#TaskManagerHeartbeatListener中,这里返回为null是因为RM不是任何组件的receiver,即不会有组件向RM请求心跳信息,并要求其返回心跳。
getHeartbeatListener().retrievePayload
|
| //这里会调用TM向RM注册时指定的requestHeartbeat
heartbeatTarget.requestHeartbeat
|
| //ResouceManager#registerTaskExecutorInternal
taskExecutorGateway.heartbeatFromresourceManager
通过RPC调用,请求来到了TM中,其过程如下:
TaskExecutor#heartbeatFromresourceManager
|
HeartbeatManagerImpl#requestHeartbeat
|
HeartbeatMonitorImpl#reportHeartbeat
|
| //在该方法中,判断若是running则会取消之前的Timeout定时任务scheduledFuture,重新开始检查是否timeout超时的定时任务。
HeartbeatMonitorImpl#resetHeartbeatTimeout
|
| //因为从RM发来的请求中heartbeatPayload为null,则TM直接走返回心跳反应的流程
HeartbeatMonitorImpl#reportHeartbea->heartbeatTarget.receiveHeartbeat
|
| //这里生成TM的心跳信息,包括slot信息
TaskExecutor.resourceManagerHeartbeatListener#retrievePayload
|
| //通过在TM向RM注册过程中定义的receiveHeartbeat方法来实现调用RM中方法
TaskExecutor#establishresourceManagerConnection-> resourceManagerGateway.heartbeatFromTaskManager
RM收到TM的心跳信息,主要做了两件事:重置RM的Monitor线程;解析TM上报信息
resourceManager#heartbeatFromTaskManager
|
HeartbeatManagerImpl#reportHeartbeat
|
| //和TM一样,重置了monitor线程
HeartbeatManagerImpl#reportHeartbeat->reportHeartbeat
|
| //在该方法中处理上报的slot信息,
resourceManager.TaskManagerHeartbeatListener#reportPayload
周期性心跳的具体分析过程见上述流程中的注释。
以上是大佬教程为你收集整理的Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制全部内容,希望文章能够帮你解决Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。