网站网站建设网站/深圳网站营销seo电话
flink 环境构建工具类
public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** @param args* @return org.apache.flink.api.java.utils.ParameterTool* @date 2023/8/4 - 10:05 AM*/public static ParameterTool createParameterTool(final String[] args) throws Exception {return ParameterTool.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME)).mergeWith(ParameterTool.fromArgs(args)).mergeWith(ParameterTool.fromSystemProperties());}/*** flink 环境配置** @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 11:10 AM*/public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));env.getConfig();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {CheckPointUtil.setCheckpointConfig(env,parameterTool);// 取消作业时保留外部化 Checkpoint 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);}env.getConfig().setGlobalJobParameters(parameterTool);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);return env;}
}
checkpoint 工具类
public class CheckPointUtil {private static final String CHECKPOINT_MEMORY = "memory";private static final String CHECKPOINT_FS = "fs";private static final String CHECKPOINT_ROCKETSDB = "rocksdb";/*** 默认的checkpoint 存储地址*/private static final String CHECKPOINT_DEFAULT = "default";/*** 设置flink check point** @param env* @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 10:49 AM*/public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{// 根据类型,设置合适的状态后端String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {//1、state 存放在内存中,默认是 5MStateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);env.setStateBackend(stateBackend);}else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);env.setStateBackend(stateBackend);}else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);env.setStateBackend(rocksDBStateBackend);}//设置 checkpoint 周期时间env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)// 设置 exactly-once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置 checkpoint 最小间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃env.getCheckpointConfig().setCheckpointTimeout(15*60000);// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置 checkpoint 的并发度为 1env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}
}
构建kafak source 、sink
/*** 构建 source kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>* @date 2023/8/4 - 2:41 PM*/private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);// 正则表达式消费FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),new SimpleStringSchema(),props);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);// 从最开始的位置开始消费if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){kafkaConsumer.setStartFromEarliest();}else{kafkaConsumer.setStartFromGroupOffsets();}return kafkaConsumer;}/*** 构建 sink kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>* @date 2023/8/16 - 11:38 AM*/private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC), (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes()),props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);}
kafka 工具类
public class KafkaConfigUtil {/*** 设置 kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/4 - 2:39 PM*/public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));props.put("flink.partition-discovery.interval-millis", "10000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");//0817 - 消费kafka数据超时时间和尝试次数props.put("request.timeout.ms", "30000");props.put("retries", 5);return props;}/*** 构建 sink kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/14 - 5:54 PM*/public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");return props;}}
jdbc 工具类
public class JdbcDatasourceUtils {public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();/*** 获取hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:23 PM*/public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);if (!DATASOURCES.containsKey(md5Key)) {synchronized (JdbcDatasourceUtils.class) {if (!DATASOURCES.containsKey(md5Key)) {DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));}}}return DATASOURCES.get(md5Key);}/*** 构建hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:14 PM*/private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dsUname);config.setPassword(dsPwd);config.setDriverClassName(dsDriver);// 从池返回的连接的默认自动提交,默认值:trueconfig.setAutoCommit(true);//只读config.setReadOnly(true);// 连接超时时间:毫秒,默认值30秒config.setConnectionTimeout(10000);// 最大连接数config.setMaximumPoolSize(32);// 最小空闲连接config.setMinimumIdle(16);// 空闲连接超时时间config.setIdleTimeout(600000);// 连接最大存活时间config.setMaxLifetime(540000);// 连接测试查询config.setConnectionTestQuery("SELECT 1");return new HikariDataSource(config);}/*** 按列加载数据** @param dataSource* @param sql* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>* @date 2023/8/15 - 6:03 PM*/public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {return loadSql(dataSource, sql, resultSet -> {List<Map<String, Object>> datas = new ArrayList<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();//组装返回值Map<String, Object> entry;while (resultSet.next()) {entry = new LinkedHashMap<>();// getColumnLabel 取重命名,getColumnName 原始字段名for (int i = 1; i <= metaData.getColumnCount(); i++) {entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));}datas.add(entry);}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 加载数据遍历放入set集合** @param dataSource* @param sql* @param function* @return java.util.Set<R>* @date 2023/8/15 - 6:03 PM*/public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {return loadSql(dataSource, sql, resultSet -> {Set<R> datas = new LinkedHashSet<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {for (int i = 1; i <= metaData.getColumnCount(); i++) {datas.add(function.apply(resultSet.getObject(i)));}}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 执行查询sql** @param dataSource* @param sql* @param function* @return R* @date 2023/8/15 - 6:03 PM*/private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();preparedStatement = connection.prepareStatement(sql);resultSet = preparedStatement.executeQuery();return function.apply(resultSet);} catch (Exception e){e.printStackTrace();} finally {if (connection != null){try {connection.close();} catch (SQLException e) {e.printStackTrace();}}if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}if (resultSet != null){try {resultSet.close();} catch (SQLException e) {e.printStackTrace();}}}return function.apply(null);}
}
相关文章:

Flink java 工具类
flink 环境构建工具类 public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** param args* return org.apache.flink.api.java.utils.ParameterTool* date 2023/8/4 - 10:05 AM*/public static …...

2023年你需要知道的最佳预算Wi-Fi路由器清单
买新路由器?让我们帮助你挑选一些既有很多功能和性能,又经济实惠的产品。 购买Wi-Fi路由器并不一定要倾家荡产,尤其是如果你不需要一个提供数百Mbps速度的路由器。廉价路由器是一个很好的选择,它包含了许多功能,不会对钱包造成影响。 一、2023年在廉价Wi-Fi路由器中寻找…...

Go语言基础之流程控制
流程控制是每种编程语言控制逻辑走向和执行次序的重要部分,流程控制可以说是一门语言的“经脉”。 Go语言中最常用的流程控制有if和for,而switch和goto主要是为了简化代码、降低重复代码而生的结构,属于扩展类的流程控制。 if else(分支结构…...

Git 安装、配置并把项目托管到码云 Gitee
错误聚集篇: 由于我 git 碰见大量错误,所以集合了一下: git 把项目托管到 码云出现的错误集合_打不着的大喇叭的博客-CSDN博客https://blog.csdn.net/weixin_49931650/article/details/132460492 1、安装 git 1.1 安装步骤 1.1.1 下载对应…...

C++信息学奥赛1147:最高分数的学生姓名
#include <iostream> #include <string> using namespace std; int main() {int n;// 输入一个整数ncin>>n;cin.ignore();string arr;string str;int max0;int fen;// 循环读取n个评分和对应的字符串for(int i0;i<n;i){cin>>fen>>arr;if(fen&…...

STM32使用PID调速
STM32使用PID调速 PID原理 PID算法是一种闭环控制系统中常用的算法,它结合了比例(P)、积分(I)和微分(D)三个环节,以实现对系统的控制。它的目的是使 控制系统的输出值尽可能接近预…...

【UE5:CesiumForUnreal】——3DTiles数据属性查询和单体高亮
目录 0.1 效果展示 0.2 实现步骤 1 数据准备 2 属性查询 2.1 射线检测 2.2 获取FeatureID 2.3 属性查询 2.4 属性显示 3 单体高亮 3.1 构建材质参数集 3.2 材质参数设置 3.3 添加Cesium Encode Metadata插件 3.4 从纹理中取出特定FeatureId属性信息 3.5 创建…...

无涯教程-PHP - 返回类型声明
在PHP 7中,引入了一个新函数返回类型声明,返回类型声明指定函数应返回的值的类型,可以声明返回类型的以下类型。 intfloatbooleanstringinterfacesarraycallable 有效返回类型 <?phpdeclare(strict_types1);function returnIntValue(i…...

DOS常见命令
DOS常见命令 DOS是什么如何打开DOScmd常见的命令集合 DOS是什么 DOC命令是我们浏览器中的终端 ,但不同的是我们打开软件的方式 使用的是点击文件图标,点击图标的同时 我们也相当于使用一个命令 只是我们看不见而已 在电脑上操作的时候 通常都是使用命令…...

Qt应用开发(拓展篇)——示波器/图表 QCustomPlot
一、介绍 QCustomPlot是一个用于绘图和数据可视化的Qt C小部件。它没有进一步的依赖关系,提供友好的文档帮助。这个绘图库专注于制作好看的,出版质量的2D绘图,图形和图表,以及为实时可视化应用程序提供高性能。 QCustomPl…...

【精度丢失】后端接口返回的Long类型参数,不同浏览器解析出的结果不一样
1、业务背景 有个同事找我帮他看一个问题,他给前端提供了一个接口。 这个接口是用来反查id的,他这里这个参数正常的返回值应该是 283232039247028226。 但前端反馈他,前端在浏览器(火狐)获取的值是 283232039247028…...

2023年国赛 高教社杯数学建模思路 - 案例:感知机原理剖析及实现
文章目录 1 感知机的直观理解2 感知机的数学角度3 代码实现 4 建模资料 # 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 感知机的直观理解 感知机应该属于机器学习算法中最简单的一种算法,其…...

java-红黑树
节点内部存储 红黑树规则 或者: 红黑树添加节点规则: 添加节点默认是红色的(效率高) 红黑树示例 注:红黑树增删改查性能都很好...

vue2 vue中的常用指令
一、为什么要学习Vue 1.前端必备技能 2.岗位多,绝大互联网公司都在使用Vue 3.提高开发效率 4.高薪必备技能(Vue2Vue3) 二、什么是Vue 概念:Vue (读音 /vjuː/,类似于 view) 是一套 **构建用户界面 ** 的 渐进式 …...

AI驱动下的智能制造:工业自动化的新纪元
随着人工智能(AI)技术的持续进步,其在工业自动化领域的影响日益显著。作为现代科技的代表,AI不仅为各行业带来了前所未有的商机和技术思路,更在工业自动化领域中引发了一场深刻的变革。本文将深入探讨AI对智能制造的影…...

docker 命令
一、docker命令 1、镜像保存 docker save imageid -o modelzoozl.tar #把镜像保存到本地 docker load -i dockername #把tar包load下来,load成镜像 docker export CONTAINERID/CONTAINERNAME -o modelzoozl.tar #把启动着的镜像导出 docker import modelzo…...

2023年高教社杯数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...

生成式人工智能的潜在有害影响与未来之路(二)
利润高于隐私:不透明数据收集增加 背景和风险 生成型人工智能工具建立在各种大型、复杂的机器学习模型之上,这些模型需要大量的训练数据才能发挥作用。对于像ChatGPT这样的工具,数据包括从互联网上抓取的文本。对于像Lensa或Stable Diffusi…...

如何自己实现一个丝滑的流程图绘制工具(三)自定义挂载vue组件
背景 bpmn-js是个流程图绘制的工具,但是现在我希望实现的是,绘制的不是节点而是一个vue组件。 保留线的拖拽和连接。 方案 那就说明不是依赖于节点的样式,找到了他有个属性,就是类似覆盖节点的操作。 思路就是用vue组件做遮罩&…...

UNIAPP调用API接口
API:开发者可以通过这些接口与其它程序进行交互,获取所需数据或者执行指定操作。 网络请求 API: UniApp 中内置了网络请求 API,方便调用 uni.request uni.uploadFile uni.request 接口主要用于实现网络请求。GET 和 POST 是使用最普遍的两种…...

理解 Delphi 的类(五) - 认识类的继承
先新建一个 VCL Forms Application 工程, 代码中就已经出现了两个类: 一个是 TForm 类; 一个是 TForm1 类; TForm1 继承于 TForm. TForm 是 TForm1 的父类; TForm1 是 TForm 的子类. unit Unit1;interfaceusesWindows, Messages, SysUtils, Variants, Classes, Graphics, Contr…...

mybatis概述及搭建
目录 1.概述 2.mybatis搭建 1.创建一个maven项目,添加mybatis、mysql所依赖的jar 2.创建一个数据库表,及对应的java类 3.创建一个mybatis的核心配置文件,配置数据库连接信息,配置sql映射文件 4.创建sql映射文件,…...

DNDC模型---土壤碳储量、温室气体排放、农田减排、土地变化、气候变化中的应用
由于全球变暖、大气中温室气体浓度逐年增加等问题的出现,“双碳”行动特别是碳中和已经在世界范围形成广泛影响。国家领导人在多次重要会议上讲到,要把“双碳”纳入经济社会发展和生态文明建设整体布局。同时,提到要把减污降碳协同增效作为促…...

Android studio 2022.3.1 鼠标移动时不显示快速文档
在使用技术工具的过程中,我们时常会遇到各种各样的问题和挑战。最近,我升级了我的Android Studio到2022.3.1版本,但是在使用过程中,我碰到了一个让我颇为困扰的问题:在鼠标移动到类名或字段上时,原本应该显…...
五度易链最新“产业大数据服务解决方案”亮相,打造数据引擎,构建智慧产业!
快来五度易链官网 点击网址【http://www.wdsk.net/】 看看我们都发布了哪些新功能!!! 自2015年布局产业大数据服务行业以来,“五度易链”作为全国产业大数据服务行业先锋企业,以“让数据引领决策,以智慧驾驭未来”为愿景,肩负“打…...

简述hive环境搭建
文章目录 部署参数配置hive简单命令 部署 Hive的三种部署模式,主要按Metastore 的运行模式进行区分。 在安装Hive之前,要求先预装JDK 8、Hadoop、MySQL ; 1.下载hive,并解压缩到用户主目录下 tar -xzvf apache-hive-2.3.6-bin.t…...

小米AI音箱联网升级折腾记录(解决配网失败+升级失败等问题)
小米AI音箱(一代)联网升级折腾记录 我折腾了半天终于勉强能进入下载升级包这步,算是成功一半吧… 总结就是,网络信号一定要好,需要不停换网找到兼容的网,还需要仔细配置DNS让音响连的上api.mina.mi.com 推荐…...

tensorRT安装
官方指导文档:Installation Guide :: NVIDIA Deep Learning TensorRT Documentation 适配很重要!!!! 需要cuda, cuDNN, tensorRT三者匹配。我的cuda11.3 所以对应的cuDNN和tensorRT下载的是如下版本: cud…...

电脑重装+提升网速
https://www.douyin.com/user/self?modal_id7147216653720341767&showTabfavorite_collectionhttps://www.douyin.com/user/self?modal_id7147216653720341767&showTabfavorite_collection 零封有哈数的主页 - 抖音 (douyin.com)https://www.douyin.com/user/self?…...

Modelica由入门到精通—为什么要学习Modelica语言
1.为什么要学习Modelica语言 本人正在研究Modelica 多领域统一建模仿真语言,特此做学习入门介绍,希望可以帮助需要的小伙伴。 文章目录 1.为什么要学习Modelica语言一、背景二、系统建模与仿真2.1 系统仿真与系统模型2.2 仿真价值与可靠性 三、物理建模…...