东营网站建设seo/营销型网站建设题库
背景
Flink版本 1.12.2
Kafka 客户端 2.4.1
在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:
JobManaer上的日志如下:
2023-10-07 10:02:52.975 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs.com)上的日志如下:2023-10-07 10:02:24.604 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.2023-10-07 10:02:52.939 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: nullat org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)at java.lang.Thread.run(Thread.java:750)2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected2023-10-07 10:08:15.541 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
当时Flink中kafka source的相关配置如下:
scan.topic-partition-discovery.interval 300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region
结论以及解决
目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s,超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析,
我们在flink kafka connector中通过设置如下参数来解决:
`properties.default.api.timeout.ms` = '600000',
`properties.request.timeout.ms` = '5000',
// max.block.ms是设置kafka producer的超时
`properties.max.block.ms` = '600000',
分析
在Flink中对于Kafka的Connector的DynamicTableSourceFactory是KafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况,
而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:
@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {final DeserializationSchema<RowData> keyDeserialization =createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);final DeserializationSchema<RowData> valueDeserialization =createDeserialization(context, valueDecodingFormat, valueProjection, null);final TypeInformation<RowData> producedTypeInfo =context.createTypeInformation(producedDataType);final FlinkKafkaConsumer<RowData> kafkaConsumer =createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);return SourceFunctionProvider.of(kafkaConsumer, false);}
该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:
@Overridepublic final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));... }@Overridepublic void open(Configuration configuration) throws Exception {// determine the offset commit modethis.offsetCommitMode =OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());// create the partition discovererthis.partitionDiscoverer =createPartitionDiscoverer(topicsDescriptor,getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks());this.partitionDiscoverer.open();subscribedPartitionsToStartOffsets = new HashMap<>();final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();if (restoredState != null) {...} else {// use the partition discoverer to fetch the initial seed partitions,// and set their initial offsets depending on the startup mode.// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily// determined// when the partition is actually read.switch (startupMode) {。。。default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}}if (!subscribedPartitionsToStartOffsets.isEmpty()) {switch (startupMode) {...case GROUP_OFFSETS:LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets.size(),subscribedPartitionsToStartOffsets.keySet());}} else {LOG.info("Consumer subtask {} initially has no partitions to read from.",getRuntimeContext().getIndexOfThisSubtask());}}this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));}@Overridepublic void run(SourceContext<T> sourceContext) throws Exception {if (subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");}// initialize commit metrics and default offset callback methodthis.successfulCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);this.failedCommits =this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();this.offsetCommitCallback =new KafkaCommitCallback() {@Overridepublic void onSuccess() {successfulCommits.inc();}@Overridepublic void onException(Throwable cause) {LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.",subtaskIndex),cause);failedCommits.inc();}};// mark the subtask as temporarily idle if there are no initial seed partitions;// once this subtask discovers some partitions and starts collecting records, the subtask's// status will automatically be triggered back to be active.if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}LOG.info("Consumer subtask {} creating fetcher with offsets {}.",getRuntimeContext().getIndexOfThisSubtask(),subscribedPartitionsToStartOffsets);// from this point forward:// - 'snapshotState' will draw offsets from the fetcher,// instead of being built from `subscribedPartitionsToStartOffsets`// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to// Kafka through the fetcher, if configured to do so)this.kafkaFetcher =createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);if (!running) {return;}if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {kafkaFetcher.runFetchLoop();} else {runWithPartitionDiscovery();}}@Overridepublic final void snapshotState(FunctionSnapshotContext context) throws Exception {...HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}... }}@Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {...fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);...}
主要是initializeState,open,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下:
注意:对于initializeState和open方法的先后顺序,可以参考StreamTask类,其中如下的调用链:
invoke()||\/
beforeInvoke()||\/
operatorChain.initializeStateAndOpenOperators||\/
FlinkKafkaConsumerBase.initializeState||\/
FlinkKafkaConsumerBase.open
就可以知道 initializeState方法的调用是在open之前的
initializeState方法
这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动
open方法
- offsetCommitMode
offsetCommitMode = OffsetCommitModes.fromConfiguration
这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit
的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTS - partitionDiscoverer
这里主要是进行kafka的topic的分区发现,主要路程是partitionDiscoverer.discoverPartitions
,这里的涉及的流程如下:
综上所述,discoverPartitions做的就是根据某种策略选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再根据策略选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认AbstractPartitionDiscoverer.discoverPartitions||\/ AbstractPartitionDiscoverer.getAllPartitionsForTopics ||\/ KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor||\/ KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms*||\/ Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata");||\/ Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点||\/ client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms*
default.api.timeout.ms
为60秒,request.timeout.ms
为30秒 - subscribedPartitionsToStartOffsets
根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到
run方法
- 设置一些指标successfulCommits/failedCommits
- KafkaFetcher
这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery:private void runWithPartitionDiscovery() throws Exception {final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();createAndStartDiscoveryLoop(discoveryLoopErrorRef);kafkaFetcher.runFetchLoop();// make sure that the partition discoverer is waked up so that// the discoveryLoopThread exitspartitionDiscoverer.wakeup();joinDiscoveryLoopThread();// rethrow any fetcher errorsfinal Exception discoveryLoopError = discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}
-
createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread =new Thread(...while (running) {...try {discoveredPartitions =partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {break;}if (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}if (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {break;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},"Kafka Partition Discovery for "+ getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start(); }
这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);中subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到
-
kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:
runFetchLoop ||\/subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量||\/partitionConsumerRecordsHandler||\/emitRecordsWithTimestamps||\/emitRecordsWithTimestamps||\/partitionState.setOffset(offset);
这里的offset就是从消费的kafka记录中获取的
-
snapshotState方法
这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中
- offsetCommitMode
这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates
并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中,
notifyCheckpointComplete方法
获取到要提交的kafka offset信息,并持久化保存kafka中
参考
- open 和 initailizeState的初始化顺序
- A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata
相关文章:

Flink 中kafka broker缩容导致Task一直重启
背景 Flink版本 1.12.2 Kafka 客户端 2.4.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker࿰…...

纯前端js中使用sheetjs导出excel,并且合并标题
先定义变量----用的是Vue2 ,以下在vue的data:{}中定义--------------//空格占位符 headerTopTitle: [患者信息, , , , , , , , , 入出院信息, , , , , , , 病案首页中的出院主要诊断, ,出院其他诊断(病案首页中原始信息), , , , ,…...

猫眼 校园招聘_1面
(1)打包和构建工具 vite 和 webpack 功能 1. 构建原理: Webpack 是一个静态模块打包器,通过对项目中的JavaScript、css、Image 等文件进行分析,生成对应的静态资源,并且通过一些插件和加载器来实现各种功…...

博弈论——博弈信息结构
博弈信息结构 0 引言 在一个博弈构成中,博弈信息结构是不可或缺要素。博弈信息,顾名思义,就是在博弈中,博弈方对于信息的了解。知己知彼,百战不殆。和短兵相接的战争一样,只有充分了解自己的优劣势&#x…...

求二叉树的高度——函数递归的思想
二叉树的高度:左右两个数最高的那个的1 int TreeHight(BTNode* root) {if (root NULL){return 0;}int lefhightTreeHight(root->left);int righthight TreeHight(root->right);return lefhight > righthight ? TreeHight(root->left) 1 : TreeHight…...

ue5蓝图请求接口
安装与使用 1、在虚幻商城搜索 VaRest 插件 2、选择自己项目的对应版本安装 3、查看是否安装成功 4、进入项目后,分别启动VaRest、JSON Blueprint Utilities两个插件(勾选后会提示重启项目) 5、基本用法:打开关卡蓝图使用…...

windows server 2012 查看已打了哪些补丁
打开控制面板 点击卸载程序 点击 查看已安装的更新 下图是已安装的补丁...

参加CSP-J第一轮后的感受
本人现在初二。作为一名学了4年多c的人,我一直都挺想考过CSP。于是,去年我就去考了。 当时初一,感觉自己实力不够,就只报了J组的。果不其然,63分,没过。 经过1年的苦练,今年又去考了。 J组78分&…...

rust 智能指针
智能指针 Box Box 的使用场景 由于 Box 是简单的封装,除了将值存储在堆上外,并没有其它性能上的损耗。而性能和功能往往是鱼和熊掌,因此 Box 相比其它智能指针,功能较为单一,可以在以下场景中使用它: 特…...

CentOS 7系统安装配置Zabbix 5.0LTS 步骤
目录 一、查看Zabbix官方教程(重点) 二、安装 Docker 创建 Mysql 容器 安装 Docker 依赖包 添加 Docker 官方仓库 安装 Docker 引擎 启动 Docker 服务并设置开机自启 验证 Docker 是否成功安装 拉取 MySQL 镜像 查看本地镜像 运行容器 停止和启…...

【学习之路】Multi Agent Reinforcement Learning框架与代码
【学习之路】Multi Agent Reiforcement Learning框架与代码 Introduction 国庆期间,有个客户找我写个代码,是强化学习相关的,但我没学过,心里那是一个慌,不过好在经过详细的调研以及自身的实力,最后还是解…...

android 13.0 SystemUI导航栏添加虚拟按键功能(二)
1.概述 在13.0的系统产品开发中,对于在SystemUI的原生系统中默认只有三键导航,想添加其他虚拟按键就需要先在构建导航栏的相关布局 中分析结构,然后添加相关的图标xml就可以了,然后添加对应的点击事件,就可以了,接下来先分析第二步关于导航栏的相关布局情况 然后实现功能…...

Java8 新特性之Stream(二)-- Stream的中间操作
目录 1.filter(Predicate) 2.map(Function) 3.flatMap(Function) 4.distinct() 5.sorted([Comparator]) 6.limit(n) 7.skip(n) 8.peek(Consumer)...

CA与区块链之数字签名详解
CA与区块链验证本质上都是数字签名,首先,我们看一下什么是数字签名! 数字签名 数字签名是公钥密码学中的一种技术,用于验证信息的完整性和发送者的身份。简而言之,数字签名是一种确认信息来源和信息完整性的手段。它通…...

一文解读如何应用 REST 对资源进行访问?
文章目录 一、REST 简介二、涉及注解2.1 RequestMapping2.2 PathVariable2.3 RestController2.4 GetMapping、PostMapping、PutMapping、DeleteMapping补充:PathVariable、RequestBody、RequestParam 区别与应用 三、REST风格案例 一、REST 简介 REST (Representat…...

使用JAVA发送邮件
这里用java代码编写发送邮件我采用jar包,需要先点击这里下载三个jar包:这三个包分别为:additionnal.jar;activation.jar;mail.jar。这三个包缺一不可,如果少添加或未添加均会报下面这个错误: C…...

【JavaEE】_servlet程序的编写方法
目录 1. 创建项目 2. 引入依赖 3. 创建目录结构 3.1 在main目录下创建一个webapp目录 3.2 在webapp目录下创建一个WEB-INF目录 3.3 在WEB-INF目录下创建一个web.xml文件 3.4 在web.xml中进行代码编写 4. 编写代码 4.1 在java目录下创建类 4.2 打印"hello world&…...

美国市场三星手机超苹果 中国第一属华为
报告显示,截至5月份的三个月,iOS系统在美国、澳大利亚以及日本表现不俗。Android系统份额则在英国、德国以及法国实现增长。在中国城市地区,iOS份额同比基本持平,而Android份额则达到80.5%,同比增长1个百分点。 三星在…...

nodejs+vue+elementui医院挂号预约管理系统4n9w0
前端技术:nodejsvueelementui 前端:HTML5,CSS3、JavaScript、VUE 1、 node_modules文件夹(有npn install Express 框架于Node运行环境的Web框架, 开发语言 node.js 框架:Express 前端:Vue.js 数据库:mysql 数据库工具ÿ…...

调试技巧(课件图解)
...

react中获取input输入框内容的两种方法
一.通过event对象信息的方式 <input onChange{(e)>this.inputChange(e)}/> <button onClick{()>this.getInputValue} >获取input的值</button>inputChange(e){alert(e.target.value)this.setState({username:e.target.value}) } getInputValue(){aler…...

Linux基础—1
1、命令行 1) 重要快捷键 按键作用Tab命令补全Ctrl强行终止当前程序Ctrld键盘输入结束或退出终端Ctrls暂停当前程序,暂停后按下任意键恢复运行Ctrlz将当前程序放到后台运行,恢复到前台为命令fgCtrla将光标移至输入行头,相当于Home键Ctrle将…...
十个面试排序算法
一、 前言 最常考的是快速排序和归并排序,并且经常有面试官要求现场写出这两种排序的代码。对这两种排序的代码一定要信手拈来才行。还有插入排序、冒泡排序、堆排序、基数排序、桶排序等。面试官对于这些排序可能会要求比较各自的优劣、各种算法的思想及其使用场景…...

技术学习群-第四期内容共享
本期是技术群聊的第四期。还是那句话,《群聊免费进入》。一起来看看本期分享内容。 uiautomator-Error问题 在使用u2的过程中,有时候需要使用到uiautomator这个工具来进行查阅层级。但是博主遇到了这么个问题。 《问题分析》:发生此问题的原因…...

冒泡排序/鸡尾酒排序
冒泡排序 冒泡排序(Bubble Sort)是一种简单的排序算法,它通过多次交换相邻元素的位置来实现排序。它的基本思想是从数组的第一个元素开始,比较相邻的两个元素,如果它们的顺序错误,则交换它们的位置。重复进…...

代码随想录算法训练营第五十三天|309.最佳买卖股票时机含冷冻期、714.买卖股票的最佳时机含手续费
代码随想录算法训练营第五十三天|309.最佳买卖股票时机含冷冻期、714.买卖股票的最佳时机含手续费 309.最佳买卖股票时机含冷冻期714.买卖股票的最佳时机含手续费 309.最佳买卖股票时机含冷冻期 题目链接:309.最佳买卖股票时机含冷冻期 文章链接 状态:有…...

【Docker】Docker的使用案例以及未来发展、Docker Hub 服务、环境安全、容器部署安全
作者简介: 辭七七,目前大二,正在学习C/C,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: 七七的闲谈 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖…...

qt qtabwidget获取当前选项卡的所有按键
要获取当前选项卡中的所有按键,可以通过以下步骤进行: 通过currentIndex()函数获取当前选项卡的索引。 使用widget()函数获取当前选项卡的QWidget。 连接QWidget的keyPressEvent事件,并在事件处理函数中获取按下的按键信息。 下面是示例代…...

为什么Excel插入图片不显示,点击能够显示
很久没有Excel了,今天在做Excel表格时,发现上传图片后不能显示,但是点击还是能够出现图片的 点击如下 点击能看到,但是不显示? 最后发现只需鼠标右键点击浮动即可显示...

使用Python创建faker实例生成csv大数据测试文件并导入Hive数仓
文章目录 一、Python生成数据1.1 代码说明1.2 代码参考 二、数据迁移2.1 从本机上传至服务器2.2 检查源数据格式2.3 检查大小并上传至HDFS 三、beeline建表3.1 创建测试表并导入测试数据3.2 建表显示内容 四、csv文件首行列名的处理4.1 创建新的表4.2 将旧表过滤首行插入新表 一…...