深度了解flink rpc机制(四) 组件启动流程源码分析
前言
目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍
深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客
深度了解flink rpc机制(二)-动态代理-CSDN博客
深度了解flink rpc机制(三)-组件以及交互-CSDN博客
这篇文章通过分析源码,对以上知识进行验证并串联加深印象,更深入的了解Flink RPC的实现原理。本篇文章分享TaskManager启动和向ResouceManager注册的流程,TaskManager在flink 1.12之后被更名为TaskExecutor,可能文章中两个名称都会使用,大家理解成一个就行。
TaskManage启动源码分析
入口类
TaskManager的启动类入口,以Flink的Standalone模式为例,可以在flink目录下的bin目录的flink-daemon.sh找到入口类:
. "$bin"/config.shcase $DAEMON in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo "Unknown daemon '${DAEMON}'. $USAGE."exit 1;;
esac
从这里可以看到Standalon模式下各个组件的启动类入口,TaskManager的入口类是TaskManageRunner,做为组件的入口类,肯定会有main方法:
public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles != -1L) {LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);} else {LOG.info("Cannot determine the maximum number of open file descriptors");}//安装的方式启动taskmanager进程 runTaskManagerProcessSecurely(args);}
之后就是在TaskManageRunner的方法调用了,最终会进入到runTaskManager这个静态方法
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//之前方法都是静态方法调用,初始化taskManagerRunner对象taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService);//开始创建TaskmanagertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}
之前一直是在调用TaskManageRunner的静态方法做一些日志加载,安全检查的前置校验,此时才真正的实例化TaskManageRunner对象,调用start方法进行TaskManager的创建
//taskManagerRunner.start()public void start() throws Exception {synchronized (lock) {startTaskManagerRunnerServices();taskExecutorService.start();}
}
创建RpcService和TaskExecutor
taskManagerRunner.start()方法内部有两个方法的调用
- startTaskManagerRunnerServices()
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem = RpcSystem.load(configuration);//非RPC相关 代码省略JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//创建rpcServicerpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//非RPC相关 代码省略//创建TaskExecutortaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}}
可以看到这个方法首先调用createRpcService这个方法,这个方法内部内就是去创建ActorSystem,初始化RpcService
初始化RpcServer和PekkoInvocationHandler
然后就是创建TaskExecutor,TaskExecutor继承自EndPoint,EndPoint构造方法执行的时候会初始化RpcServer
/*** Initializes the RPC endpoint.** @param rpcService The RPC server that dispatches calls to this RPC endpoint.* @param endpointId Unique identifier for this endpoint*/protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");//创建RpcServer 方法内部//1.创建Acotr通信对象PekkoRpcActor//2.对象动态代理对象PekkoInvocationHandler赋值给rpcServerthis.rpcServer = rpcService.startServer(this);this.resourceRegistry = new CloseableRegistry();this.mainThreadExecutor =new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);registerResource(this.mainThreadExecutor);}
- taskExecutorService.start()
这个方法会调用TaskExecutor对象的start方法,会调用父类EndPoint的start方法
/*** Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc* endpoint is ready to process remote procedure calls.*/public final void start() {rpcServer.start();}
rpcServer.start()方法如下
public void start() {//rpcEndpoint是Actor对象rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());}
这块儿代码就是taskmanger初始化后自己会给自己发送一个Akka START控制类的消息,准确来说是继承了EndPoint的类都会在初始化之后给自身发送一个这样的消息。
因为发的是Akka的消息,会进入到TaskExecutor的PekkoInvocationHandler#createReceive接收Akka消息的逻辑
//构造方法PekkoRpcActor(final T rpcEndpoint,final CompletableFuture<Boolean> terminationFuture,final int version,final long maximumFramesize,final boolean forceSerialization,final ClassLoader flinkClassLoader) {//省略其他代码//PekkoPrcActor初始化 会将state枚举值设置为StoppedState.STOPPEDthis.state = StoppedState.STOPPED;}//接收消息@Overridepublic Receive createReceive() {return ReceiveBuilder.create()//匹配到握手消息.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)//控制类消息.match(ControlMessages.class, this::handleControlMessage)//除以上两种之外的任意消息.matchAny(this::handleMessage).build();//处理控制类消息的逻辑 private void handleControlMessage(ControlMessages controlMessage) {try {switch (controlMessage) {case START:state = state.start(this, flinkClassLoader);break;case STOP:state = state.stop();break;case TERMINATE:state = state.terminate(this, flinkClassLoader);break;default:handleUnknownControlMessage(controlMessage);}} catch (Exception e) {this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);throw e;}}
PekkoRpcActor在初始化的时候会 将自身state属性设置为StoppedState.STOPPED;
接收到ControlMessages.START消息,会走到handleControlMessage方法的case stop分支,因为state是StoppedState.STOPPED,所以代码会走到StoppedState这个静态枚举类的start方法
public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {pekkoRpcActor.mainThreadValidator.enterMainThread();try {runWithContextClassLoader(() -> pekkoRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);} catch (Throwable throwable) {pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format("Could not start RpcEndpoint %s.",pekkoRpcActor.rpcEndpoint.getEndpointId()),throwable)));} finally {pekkoRpcActor.mainThreadValidator.exitMainThread();}return StartedState.STARTED;}
pekkoRpcActor.rpcEndpoint.internalCallOnStart()
这块儿代码是关键,又指定到了Endpoint定义的方法,
public final void internalCallOnStart() throws Exception {validateRunsInMainThread();isRunning = true;onStart();}protected void onStart() throws Exception {}
这块儿代码饶了半天,其实用大白话来讲就是Flink任何需要进行通信的组件都要继承Endpoint类,组件初始化之前会先初始化RpcService对象作为Endpoint子类的成员变量,然后再由RpcService初始化ActorSystem,创建Actor和代理对象,之后再给自身发一个控制类的START方法,最后一定要进入到自身的onStart方法
TaskExecutor向ResourceManager注册流程
onStart方法开始进入到向ResourceManager注册的流程
@Overridepublic void onStart() throws Exception {try {//开始向ResourceManager注册startTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManager//new ResourceManagerLeaderListener()是真正注册的代码resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//省略其他代码} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}
new ResourceManagerLeaderListener()是真正注册的方法
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() ->notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}
再进入到notifyOfNewResourceManagerLeader方法内部
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {//获取ResouceManager的通信地址resourceManagerAddress =createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//尝试连接ResouceMnangerreconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s",resourceManagerAddress)));
}
reconnectToResourceManager方法内部
private void reconnectToResourceManager(Exception cause) {//如果已存在ResourceManger的连接 关闭连接closeResourceManagerConnection(cause);//设置注册超时时间startRegistrationTimeout();//继续尝试连接ResouceManagertryConnectToResourceManager();}
tryConnectToResourceManager();
private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress != null);assert (establishedResourceManagerConnection == null);assert (resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);//封装taskExecutor的信息:地址 硬件资源 内存资源final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}
进入到connectToResourceManager方法,封装注册信息。进入start方法
public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");//创建注册成功、注册失败的回调方法final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//开始主持newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}
首先创建注册成功和主持失败的回调方法,然后继续进入注册的流程
public void startRegistration() {//创建动态代理对象final CompletableFuture<G> rpcGatewayFuture;//ResourceManager可能有主从,所以走Fenced这块儿if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture =(CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}//省略其他代码
}private <C extends RpcGateway> CompletableFuture<C> connectInternal(final String address,final Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {checkState(!stopped, "RpcService is stopped");//省略无关代码//握手确保连接正常final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();//真正核心的代码 创建代理的实现@SuppressWarnings("unchecked")C proxy =(C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);}
然后就会走到RpcService获取到ReouceManager的代理,然后将代理对象和主持方法通过akka消息发送到ResouceManager的RpcActor,然后进入消息处理,执行代理的对象的注册方法,也就是ResouceManager的注册方法,从而将TaskManager进行注册
启动注册流程图
相关文章:

深度了解flink rpc机制(四) 组件启动流程源码分析
前言 目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制&…...

C++基于opencv的视频质量检测--遮挡检测
文章目录 0.引言1. 原始代码分析1.1 存在的问题 2. 优化方案3. 优化后的代码4. 代码详细解读4.1. 输入检查4.2. 图像预处理4.3. 高斯模糊4.4. 梯度计算4.5. 计算梯度幅值和方向4.6. 边缘检测4.7. 计算边缘密度4.8. 估计遮挡程度4.9. 限定结果范围4.10. 返回结果 0.引言 视频质…...

手机玩潜水员戴夫?GameViewer远程如何随时随地玩潜水员戴夫教程
如果你是潜水员戴夫的忠实玩家,你知道如何在手机上玩潜水员戴夫吗?潜水员戴夫是一个以神秘蓝洞为背景的海洋冒险游戏。在这个游戏里你白天可以在美丽的大海里打鱼,晚上可以经营寿司店。现在这个游戏也能实现用手机随时随地畅玩了!…...

UE5 喷射背包
首选创建一个输入操作 然后在输入映射中添加,shift是向上飞,ctrl是向下飞 进入人物蓝图中编写逻辑,变量HaveJatpack默认true,Thrust为0 最后...

【Vue3】第三篇
Vue3学习第三篇 01. 组件组成02. 组件嵌套关系03. 组件注册方式04. 组件传递数据Props05. 组件传递多种数据类型06. 组件传递Props校验07. 组件事件08. 组件事件配合v-model使用09. 组件数据传递10. 透传Attributes 01. 组件组成 在vue当中,组件是最重要的知识&…...

c++二级指针
如果要通过函数改变一个指针的值,要往函数中传入指针的指针 如果要通过函数改变一个变量的值,那就要往函数中传入这个变量的地址 改变a的值和b的值 #include <iostream>using namespace std;void swap(int* a, int* b) {int temp *a;*a *b;*b …...

客户端存储 — IndexedDB 实现分页查询
前言 相信 IndexedDB 大家都有过了解,但是不一定每个人都有过实践,并且其中涉及到事务、游标等概念,会导致在初次使用时会有些不适应,那么本文会通过 IndexedDB 实现分页查询的形式进行实践,在开始之前,可…...

logback 如何将日志输出到文件
如何作 将日志输出到文件需要使用 RollingFileAppender,该 Appender 必须定义 rollingPolicy ,另外 rollingPollicy 下必须定义 fileNamePattern 和 encoder <appender name"fileAppender" class"ch.qos.logback.core.rolling.Rollin…...

Files.newBufferedReader和Files.readAllLines
在Java中,Files.newBufferedReader 和 Files.readAllLines 都是用于从文件中读取数据的工具方法,但它们的使用场景和功能有所不同。下面我将详细解释这两个方法的含义、用途、区别、优缺点以及各自的使用场景。 1. Files.newBufferedReader 含义和用途…...

MySQL 数据库备份与恢复全攻略
MySQL 数据库备份与恢复全攻略 引言 在现代应用中,数据库是核心组件之一。无论是个人项目还是企业级应用,数据的安全性和完整性都至关重要。为了防止数据丢失、损坏或意外删除,定期备份数据库是必不可少的。本文将详细介绍 MySQL 数据库的备…...

Appium中的api(一)
目录 1.基础python代码准备 1--参数的一些说明 2--python内所要编写的代码 解释 2.如何获取包名和界面名 1-api 2-完整代码 代码解释 3.如何关闭驱动连接 4.安装卸载app 1--卸载 2--安装 5.判断app是否安装 6.将应用放到后台在切换为前台的时间 7.UIAutomatorViewer的使用 1--找…...

【AI辅助设计】没错!训练FLUX LoRA就这么简单!
前言 得益于开源社区的力量,在各位大佬的努力下,现在16G VRAM的家用电脑也可以训练FLUX的LoRA了 👏。 今天我使用fluxgym这个方法,训练LoRA,并记录过程。 篇幅有限,这里就不一一展示了,有需要的…...

Mac 下安装FastDFS
首先我们需要下载相对应的安装包: libfastcommonFastDFS 下载完成后我们先将其解压到桌面。 1.安装libfastcommon 我们进入到libfastcommon-master目录中执行./make.sh和sudo ./make.sh install,具体代码如下: 2.安装FastDFS 同安装libfa…...

人工智能的未来:重塑生活与工作的变革者
随着人工智能(AI)技术的快速发展,我们正处于一个前所未有的变革时代。AI不仅在医疗、企业运营和日常生活中发挥着重要作用,而且正在重新定义我们的生活和工作方式。本文将探讨人工智能技术的应用前景以及它如何改变我们的生活和工…...

【微服务】Java 对接飞书多维表格使用详解
目录 一、前言 二、前置操作 2.1 开通企业飞书账户 2.2 确保账户具备多维表操作权限 2.3 创建一张测试用的多维表 2.4 获取飞书开放平台文档 2.5 获取Java SDK 三、应用App相关操作 3.1 创建应用过程 3.2 应用发布过程 3.3 应用添加操作权限 四、多维表应用授权操作…...

学习threejs,使用粒子实现下雪特效
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.Points简介1.11 ☘️…...

unity3d——Time
在Unity3D中,Time类是一个非常重要的工具类,它提供了一系列与时间相关的属性和方法,帮助开发者在游戏中实现各种时间相关的操作。以下是一些Time类常用的方法及其示例: 一、常用属性 Time.time 含义:表示从游戏开始到…...

天地图实现海量聚合marker--uniapp后端详细实现
本文章详细的讲解了前后端代码来 实现uniapp天地图功能的实现 以及 后端海量数据的聚合查询 和网格算法实现思路。 并对当数据量增加和用户频繁请求接口时可能导致服务器负载过高做了前后端优化。 前端uniapp: 实现了天地图的行政区划边界/地图切换/比例尺/海量数…...

Bug | 项目中数据库查询问题
问题描述 理论上,点击查询后,表头应当显示中文。而不是上面的在数据库中的表头【如上图示】 正常点击查询后,如果没有输入值,应当是查询所有的信息。 原因分析: 这里是直接使用SELECT * 导致的。例如: S…...

C++入门基础知识129—【关于C 库函数 - time()】
成长路上不孤单😊😊😊😊😊😊 【14后😊///C爱好者😊///持续分享所学😊///如有需要欢迎收藏转发///😊】 今日分享关于C 库函数 - time()的相关内容࿰…...

大文件秒传,分片上传,断点续传
大文件分片上传 一 功能描述 1.文件通过web端分片多线程上传到服务端,然后web端发起分片合并,完成大文件分片上传功能 2.上传过的大文件,实现秒传 3.上传过程中,服务异常退出,实现断点续传 二 流程图 三 代码运行…...

多生境扩增子探秘:深度溯源与多样性解析
分析微生物组数据的组成结构的一个主要挑战是确定其潜在来源。在微生物来源分析中,随机森林、SourceTracker和FEAST都有较广泛应用。今天,小编就带大家看一篇发表在《iMeta》的文章,使用溯源技术追踪微生物的来源与去向,揭示生物在…...

Selenium4自动化测试常用函数总结,各种场景操作实战
🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 seleninum作为自动化测试的工具,自然是提供了很多自动化操作的函数,下面列举下比较常用的函数,更多可见官方文档:…...

图像生成新范式:智源推出全能视觉生成模型 OmniGen
大型语言模型(LLM)的出现统一了语言生成任务,并彻底改变了人机交互。然而,在图像生成领域,能够在单一框架内处理各种任务的统一模型在很大程度上仍未得到探索。近日,智源推出了新的扩散模型架构 OmniGen&am…...

实现RPC接口的demo记录
1.Thrift RPC 接口实现 Demo Service public class DemoServiceImpl implements DemoService.Iface {private static final Logger logger LoggerFactory.getLogger(DemoServiceImpl.class);Overridepublic String sayHello(Context context, String msg) throws TException …...

Python期末题目 | 期末练习题【概念题+代码】
一、前言 Python 是一门功能强大且易于学习的编程语言,在高校中被广泛用作教学语言。Python 的期末考试通常会包含基础知识和编程实践,以考察学生的理解与应用能力。本文整理了一套 Python 期末练习题,包括选择题、填空题、判断题和代码题。…...

OpenCV基本操作(python开发)——(6)视频基本处理
OpenCV——视频基本处理 一、读取摄像头 import numpy as np import cv2cap cv2.VideoCapture(0) # 实例化VideoCapture对象, 0表示第一个摄像头 while cap.isOpened():ret, frame cap.read() # 捕获帧cv2.imshow("frame", frame)c cv2.waitKey(1) # 等待1毫…...

详解Java之Spring MVC篇一
目录 Spring MVC 官方介绍 MVC RequestMapping 传递参数 无参数 单个参数 针对String类型 针对Integer类型 针对int类型 针对自定义类型 多个参数 参数重命名 参数强制一致 参数不强制一致 传递数组 编辑传递List 编辑 传递JSON 编辑 从路径中获取参…...

ubuntu20.04上使用 Verdaccio 搭建 npm 私有仓库
安装nvm 首先安装必要的工具: apt update apt install curl下载并执行nvm安装脚本: curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash添加环境变量(如果安装脚本没有自动添加)。编辑 ~/.bash…...

Python实现办公自动化的数据可视化与报表生成
在 Python 中,可以利用多个库来实现办公自动化中的数据可视化与报表生成。以下是具体的方法: 一、数据可视化 使用 matplotlib 库 matplotlib 是一个强大的 Python 绘图库,可以创建各种类型的静态、动态和交互式图表。示例代码:i…...