flink on yarn with kerberos 边缘提交
flink on yarn 带kerberos 远程提交 实现
- flink kerberos 配置

- 先使用ugi进行一次认证
- 正常提交
import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;import static org.apache.flink.util.Preconditions.checkNotNull;/**
* @author: jiayeli.cn
* @description
* @date: 2023/8/29 下午9:09
*/@Slf4j
public class YarnClientTestCase {@Testpublic void submitJobWithYarnDesc() throws ClusterDeploymentException, IOException {// hadoopString hadoopConfDir = "/x/x/software/spark-3.3.2-bin-hadoop3/etc/hadoop";//flink的本地配置目录,为了得到flink的配置String flinkConfDir = "/opt/flink-1.14.3/conf";//存放flink集群相关的jar包目录String flinkLibs = "hdfs://node01:8020/lib/flink";//用户jarString userJarPath = "hdfs://node01:8020/jobs/streaming/testCase/TopSpeedWindowing.jar";String flinkDistJar = "hdfs://node01:8020/lib/flink/flink-dist_2.12-1.14.3.jar";String[] args = "".split("\\s+");String appMainClass = "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing";String principal = "dev@JIAYELI.COM";String keyTab = "/x/x/workspace/bigdata/sparkLauncherTestcase/src/test/resource/dev_uer.keytab";enableKrb5(principal, keyTab);YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();Optional.ofNullable(hadoopConfDir).map(e -> new File(e)).filter(dir -> dir.exists()).map(File::listFiles).ifPresent(files -> {Arrays.asList(files).stream().filter(file -> Files.getFileExtension(file.getName()).equals(".xml")).forEach(conf -> yarnConfiguration.addResource(conf.getPath()));});yarnClient.init(yarnConfiguration);yarnClient.start();Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);//set run modelflinkConf.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());//set application nameflinkConf.setString(YarnConfigOptions.APPLICATION_NAME, "onYarnApiSubmitCase");//flink on yarn dependencyflinkConf.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(new Path(flinkLibs).toString()));flinkConf.set(YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar);flinkConf.set(PipelineOptions.JARS, Collections.singletonList(new Path(userJarPath).toString()));//设置:资源/并发度flinkConf.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1);flinkConf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));flinkConf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));flinkConf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(1024).setTaskManagerMemoryMB(1024).setSlotsPerTaskManager(2).createClusterSpecification();YarnClusterInformationRetriever ycir = YarnClientYarnClusterInformationRetriever.create(yarnClient);YarnConfiguration yarnConf = (YarnConfiguration) yarnClient.getConfig();ApplicationConfiguration appConfig = new ApplicationConfiguration(args, appMainClass);YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConf,yarnConf,yarnClient,ycir,false);ClusterClientProvider<ApplicationId> applicationCluster =yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig );yarnClient.stop();}private void enableKrb5(String principal, String keyTab) throws IOException {System.setProperty("java.security.krb5.conf", "/x/x/Documents/kerberos/krb5.conf");org.apache.hadoop.conf.Configuration krb5conf = new org.apache.hadoop.conf.Configuration();String krb5ConfPath = "/x/x/Documents/kerberos/krb5.conf";krb5conf.set("hadoop.security.authentication", "kerberos");// UserGroupInformation.setConfiguration(conf)UserGroupInformation.setConfiguration(krb5conf);// 登录Kerberos并获取UserGroupInformation实例UserGroupInformation.loginUserFromKeytab(principal, keyTab);UserGroupInformation ugi = UserGroupInformation.getCurrentUser();log.debug(ugi.toString());}
相关文章:
flink on yarn with kerberos 边缘提交
flink on yarn 带kerberos 远程提交 实现 flink kerberos 配置 先使用ugi进行一次认证正常提交 import com.google.common.io.Files; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.flink.client.cli.CliFrontend; import o…...
NodeJS的简介以及下载和安装
本章节会带大家下载并安装NodeJs 以及简单的入门,配有超详细的图片,一步步带大家进行下载与安装 NodeJs简介关于前端与后端Node是什么?为什么要学习NodeNodeJS的优点: NodeJS的下载与安装NodeJS的下载: NodeJS的快速入…...
量化面试-概率题
文章目录 一、题目1.糖果罐(绿皮书79页)2 折木棍(绿皮书89页)3 第一张ACE(绿皮书95页)4 n个均匀分布之和(绿皮书95页) 二、答案1. 糖果罐2 折木棍3 第一张ACE4 n个均匀分布之和 一、…...
【spark】java类在spark中的传递,scala object在spark中的传递
记录一个比较典型的问题,先讲一下背景,有这么一个用java写的类 public class JavaClass0 implements Serializable {private static String name;public static JavaClass0 getName(String str) {if (name null) {namestr;}return name;}... }然后在sp…...
php 文字生成图片保存到本地
你可以使用PHP的GD库来生成图片并保存到本地。首先,你需要确保你的PHP环境已经安装了GD库。然后,你可以使用GD库的函数来创建一个画布,并在上面绘制文字。最后,使用imagepng或imagejpeg函数将画布保存为PNG或JPEG格式的图片文件。…...
面试手撕—二叉搜索树及其后序遍历
一、引言 在面试地平线的时候,聊到了二叉搜索树,让手撕二叉搜索树,以下是要求 1、用类模板实现二叉搜索树 2、写一个函数,实现给一个vector数组,转换成二叉搜索树 3、写出二叉搜索树的后序遍历 二、代码实现 #inc…...
Java数据结构面试题以及答案
本专栏记录Java后端开发相关的面试题,欢迎大家阅读专栏的其他文章。 目录 1.B树和B树的区别?B树和B树的优点分别是? 2.排序算法的种类和复杂度 3.HashMap和Hashtable的原理、区别、应用场景 4.ConcurrentHashMap的原理、应用场景 5.Arra…...
Java——它要求用户输入一个整数(实际上是一个字符串),然后计算该整数的平方值,并将结果输出。
这是一个Java程序,它要求用户输入一个整数(实际上是一个字符串),然后计算该整数的平方值,并将结果输出。程序的基本流程如下: 首先,声明并初始化变量data和result,它们的初始值都为…...
【科研论文配图绘制】task6直方图绘制
【科研论文配图绘制】task6直方图绘制 task6 主要掌握直方图的绘制技巧,了解直方图含义,清楚统计指标的添加方式 1.直方图 直方图是一种用于表示数据分布和离散情况的统计图形,它的外观和柱形图相近,但它所 表达的含义和柱形图…...
Leetcode刷题:395. 至少有 K 个重复字符的最长子串、823. 带因子的二叉树
Leetcode刷题:395. 至少有 K 个重复字符的最长子串、823. 带因子的二叉树 1. 395. 至少有 K 个重复字符的最长子串算法思路参考代码和运行结果 2. 823. 带因子的二叉树算法思路参考代码和运行结果 1. 395. 至少有 K 个重复字符的最长子串 题目难度:中等 标签&#…...
java八股文面试[多线程]——Synchronized的底层实现原理
笔试:画出Synchronized 线程状态流转实现原理图 synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized 翻译为中文的意思是同步,也称之为”同步锁“。 synchronized的作用是保证在同一时刻, 被修饰的代码块或方…...
C#,《小白学程序》第三课:类、类数组与排序
类class把数值与功能巧妙的进行了结合,是编程技术的主要进步。 下面的程序你可以确立 分数 与 姓名 之间关系,并排序。 1 文本格式 /// <summary> /// 同学信息类 /// </summary> public class Classmate { /// <summary> /…...
史上最全AP、mAP详解与代码实现
文章目录 前言一、mAP原理1、mAP概念2、准确率3、精确率4、召回率5、AP: Average Precision 二、mAP0.5与mAP0.5:0.951、mAP0.52、mAP0.5:0.95 三、mAP代码实现1、真实标签json文件格式2、模型预测标签json文件格式3、mAP代码实现4、mAP结果显示 四、模型集成mAP代码1、模型mai…...
百数应用中心——生产制造管理解决方案解决行业难题
传统生产制造业面临着许多挑战,其中一些主要问题包括效率低下、交期压力大、需求预测不准确、生产模式复杂、异常响应慢、库存高和计划脱节等。这些问题不仅影响了生产效率和质量,也导致了不必要的成本和客户满意度下降。 生产制造管理应用对于企业的生产…...
《存储IO路径》专题:IO虚拟化初探
大家好,欢迎来到今天的科技小课堂。今天我们要聊聊的是一项非常有趣且实用的技术——I/O虚拟化(Input/Output Virtualization,简称IOV)。想象一下,如果把物理硬件资源比作一道丰盛的大餐,那么IOV就是那位神…...
Springboot2.0快速入门(第一章)
目录 一,SpringBoot简介1.1,回顾什么是Spring1.2,Spring是如何简化Java开发的1.3,什么是SpringBoot 二,Hello,World2.1,准备工作2.2,创建基础项目说明2.3,创建第一个Hell…...
Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment
目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 使用DataStream connectors创建 使用Table & SQL connectors…...
javeee spring cglib动态代理
cglib动态代理 依赖 <dependency><groupId>cglib</groupId><artifactId>cglib-nodep</artifactId><version>3.2.4</version></dependency>代理类 package com.test.cglibProxy;import net.sf.cglib.proxy.Enhancer; import …...
【Docker】Dockerfile介绍
Dockerfile是一个文本文件,其中包含了一系列的指令,用于构建Docker镜像。这些指令可以用来自动化镜像的构建过程,并创建自定义镜像。 以下是一些常用的Dockerfile指令及其功能: FROM:指定基础镜像。这是Dockerfile中…...
两个hdfs之间迁移传输数据
本文参考其他大数据大牛的博文做了整理和实际验证,主要解决hdfs跨集群复制/迁移问题。 在hdfs数据迁移时总会涉及到两个hdfs版本版本问题,致力解决hdfs版本相同和不同两种情况的处理方式,长话短说,进正文。 distcp: hadoop自带的…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
NPOI操作EXCEL文件 ——CAD C# 二次开发
缺点:dll.版本容易加载错误。CAD加载插件时,没有加载所有类库。插件运行过程中用到某个类库,会从CAD的安装目录找,找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库,就用插件程序加载进…...
R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...
对象回调初步研究
_OBJECT_TYPE结构分析 在介绍什么是对象回调前,首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例,用_OBJECT_TYPE这个结构来解析它,0x80处就是今天要介绍的回调链表,但是先不着急,先把目光…...
CTF show 数学不及格
拿到题目先查一下壳,看一下信息 发现是一个ELF文件,64位的 用IDA Pro 64 打开这个文件 然后点击F5进行伪代码转换 可以看到有五个if判断,第一个argc ! 5这个判断并没有起太大作用,主要是下面四个if判断 根据题目…...
比特币:固若金汤的数字堡垒与它的四道防线
第一道防线:机密信函——无法破解的哈希加密 将每一笔比特币交易比作一封在堡垒内部传递的机密信函。 解释“哈希”(Hashing)就是一种军事级的加密术(SHA-256),能将信函内容(交易细节…...
理想汽车5月交付40856辆,同比增长16.7%
6月1日,理想汽车官方宣布,5月交付新车40856辆,同比增长16.7%。截至2025年5月31日,理想汽车历史累计交付量为1301531辆。 官方表示,理想L系列智能焕新版在5月正式发布,全系产品力有显著的提升,每…...
