当前位置: 首页 > news >正文

新疆网站建设公司/设计公司网站模板

新疆网站建设公司,设计公司网站模板,漳州市长泰县建设局网站,网站建设协议书是否贴花代码如下: #!/usr/bin/python # -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境 s_env StreamExecutionEnvironment.get_…

代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(4)
# 必须开启 checkpoint,时间间隔为毫秒,否则不能输出数据
s_env.enable_checkpointing(600000)  # 600000 十分钟env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(s_env, environment_settings=env_settings)# 显式注册 Kafka 连接器
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink/lib/flink-connector-kafka-1.16.0.jar")# 创建 source 表
t_env.execute_sql("""
CREATE TABLE source_blog_kafka (ip_address STRING,post_id INT,event_type STRING,proctime AS PROCTIME()  -- 添加处理时间列
) WITH ('connector' = 'kafka','topic' = 'blog_views','properties.bootstrap.servers' = '110.40.130.231:9092','properties.group.id' = 'flink_kafka_hive_blog_consumers','scan.startup.mode' = 'earliest-offset','format' = 'json'
)
""")# 创建 sink 表
t_env.execute_sql("""
CREATE TABLE sink_blog_hive (post_id INT,view_count INT
) WITH ('connector' = 'kafka','topic' = 'blog_view_stats','properties.bootstrap.servers' = '110.40.130.231:9092','format' = 'json'
)
""")# 定义 SQL 查询
t_env.execute_sql("""
INSERT INTO sink_blog_hive
SELECT post_id, CAST(COUNT(*) AS INT) AS view_count
FROM source_blog_kafka
GROUP BY post_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
""")# 启动执行环境
t_env.execute("Blog View Stats")print("Data processing job started.")

展示了一个使用 Apache Flink 进行实时数据处理的例子,该例子从 Kafka 消费数据,对博客文章的浏览量进行统计,并将结果写回到另一个 Kafka 主题。以下是对你提供的代码的一些解释和建议:

解释

  1. 初始化执行环境

    • StreamExecutionEnvironment 和 StreamTableEnvironment 被用来创建流处理环境。set_parallelism(4) 设置了并行度为4,意味着任务可以在四个线程上并行执行。
    • enable_checkpointing(600000) 开启了检查点机制,确保在发生故障时能够恢复状态。这里设置的检查点间隔是十分钟。
  2. 配置 Kafka 连接器

    • 通过 pipeline.jars 参数指定了 Kafka 连接器的 JAR 文件路径,这是为了让 Flink 能够与 Kafka 集群通信。
  3. 定义 Source 和 Sink 表

    • source_blog_kafka 表用于从 Kafka 的 blog_views 主题读取数据,数据格式为 JSON。
    • sink_blog_hive 表用于将处理后的数据写入 Kafka 的 blog_view_stats 主题,同样以 JSON 格式存储。
  4. SQL 查询

    • 使用 SQL 对 source_blog_kafka 表中的数据按 post_id 分组,并计算每分钟内的点击次数,然后将结果插入到 sink_blog_hive 表中。
  5. 启动任务

    • 最后调用 t_env.execute("Blog View Stats") 启动任务执行。

建议

  • 检查点配置:确保 Kafka 连接器版本支持 Flink 的检查点功能,这对于实现精确一次(exactly-once)语义非常重要。
  • 资源管理:确保 Flink 集群有足够的资源来运行这个任务,特别是当并行度设置较高时。
  • 错误处理:考虑添加适当的错误处理逻辑,比如设置重启策略,以便在遇到失败时能够自动恢复。
  • 监控和调试:部署后,利用 Flink 提供的监控工具来跟踪作业的状态和性能,及时发现并解决问题。

相关文章:

Flink实现实时数据处理

代码如下: #!/usr/bin/python # -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境 s_env StreamExecutionEnvironment.get_…...

11.9.2024刷华为

文章目录 HJ31 单词倒排HJ32 密码提取语法知识记录 傻逼OD题目又不全又要收费,看毛线,莫名奇妙 HW这叼机构别搁这儿害人得不得? 我觉得我刷完原来的题目 过一遍华为机考的ED卷出处,就行了 HJ31 单词倒排 游戏本做过了好像 HJ3…...

Chromium 中chrome.system.storage扩展接口定义c++

一、chrome.system.storage 您可以使用 chrome.system.storage API 查询存储设备信息,并在连接和分离可移动存储设备时收到通知。 权限 system.storage 类型 EjectDeviceResultCode 枚举 "success" 移除命令成功执行 - 应用可以提示用户移除设备。…...

【Qt聊天室客户端】登录窗口

1. 验证码 具体实现 登录界面中创建验证码图片空间&#xff0c;并添加到布局管理器中 主要功能概述&#xff08;创建一个verifycodewidget类专门实现验证码操作&#xff09; 详细代码 // 头文件#ifndef VERIFYCODEWIDGET_H #define VERIFYCODEWIDGET_H#include <QWidget>…...

如何显示模型特征权重占比图【数据分析】

可视化模型的特征权重 1、流程 1、导入库: numpy:用于处理数组和矩阵。 matplotlib.pyplot:用于绘图。 sklearn.datasets:用于加载数据集。 sklearn.ensemble.RandomForestClassifier:用于训练随机森林模型。2、加载数据集: 使用load_iris函数加载Iris数据集。3、训练模…...

Ubuntu24安装MySQL

下载deb包&#xff1a; 先更新系统包&#xff1a; sudo apt update sudo apt update -y下载mysql: wget https://dev.mysql.com/get/mysql-apt-config_0.8.17-1_all.deb 安装deb包&#xff1a; sudo dpkg -i mysql-apt-config_0.8.17-1_all.deb目前mysql还没有正式支持Ubun…...

微服务架构面试内容整理-Eureka

Spring Cloud Netflix 是一个为构建基于 Spring Cloud 的微服务应用提供的解决方案,利用 Netflix 的开源组件来实现常见的分布式系统功能。以下是 Spring Cloud Netflix 的一些主要组件和特点: 服务注册与发现:Eureka 是一个 RESTful 服务,用于注册和发现微服务。服务实例在…...

qt QErrorMessage详解

1、概述 QErrorMessage是Qt框架中用于显示错误消息的一个对话框类。它提供了一个简单的模态对话框&#xff0c;用于向用户显示错误或警告消息。QErrorMessage通常用于应用程序中&#xff0c;当需要向用户报告错误但不希望中断当前操作时。它提供了一个标准的错误消息界面&…...

SpringBoot 将多个Excel打包下载

在Spring Boot应用中&#xff0c;如果你需要将多个Excel文件打包成一个ZIP文件并提供下载&#xff0c;你可以使用一些Java库来帮助完成这个任务。这里我将展示如何使用Apache POI来生成Excel文件&#xff0c;以及使用Java.util.zip来创建ZIP文件&#xff0c;并通过Spring Boot的…...

分页存储小总结

知识点: 什么是分页存储? 将内存空间分为一个个大小相等的分区&#xff08;比如&#xff1a;每个分区4KB&#xff09;&#xff0c;每个分区就是一个“页框”&#xff08;页框页帧内存块物理块物理页面&#xff09;。每个页框有一个编号&#xff0c;即“页框号”&#xff08;…...

Star-CCM+应用篇之动力电池温度场仿真操作流程与方法

1 动力电池温度场仿真项目 电池包内模组温度分布、电芯温度分布、温升速率、充电时间等。 2 动力电池温度场仿真分析流程图 图1 电池包热流场分析流程 3 动力电池温度场仿真参数需求 类别...

Spring Boot应用开发:从入门到精通

Spring Boot应用开发&#xff1a;从入门到精通 Spring Boot是Spring框架的一个子项目&#xff0c;旨在简化Spring应用的初始搭建和开发过程。通过自动配置和约定大于配置的原则&#xff0c;Spring Boot使开发者能够快速构建独立的、生产级别的Spring应用。本文将深入探讨Sprin…...

【JAVA项目】基于jspm的【医院病历管理系统】

技术简介&#xff1a;采用jsp技术、MySQL等技术实现。 系统简介&#xff1a;通过标签分类管理等方式&#xff0c;实现管理员&#xff1b;个人中心、医院公告管理、用户管理、科室信息管理、医生管理、出诊信息管理、预约时间段管理、预约挂号管理、门诊病历管理、就诊评价管理、…...

Python中的常见配置文件写法

在软件开发过程中&#xff0c;开发者常常需要利用一些固定的参数或常量。对于这些相对恒定且频繁使用的元素&#xff0c;一种常见的做法是将它们集中存储在一个特定的文件中&#xff0c;以避免在多个模块代码中重复定义&#xff0c;从而维护核心代码的清晰度和整洁性。 具体而…...

语义分割实战——基于PSPnet神经网络动物马分割系统源码

第一步&#xff1a;准备数据 动物马分割数据&#xff0c;总共有328张图片&#xff0c;里面的像素值为0和1&#xff0c;所以看起来全部是黑的&#xff0c;不影响使用 第二步&#xff1a;搭建模型 psp模块的样式如下&#xff0c;其psp的核心重点是采用了步长不同&#xff0c;po…...

Python+Appium编写脚本

一、环境配置 1、安装JDK&#xff0c;版本1.8以上 2、安装Python&#xff0c;版本3.x以上&#xff0c;用来解释python 3、安装node.js&#xff0c;版本^14.17.0 || ^16.13.0 || >18.0.0&#xff0c;用来安装Appimu Server 4、安装npm&#xff0c;版本>8&#xff0c;用…...

RK3288 android7.1 适配 ilitek i2c接口TP

一&#xff0c;Ilitek 触摸屏简介 Ilitek 提供多种型号的触控屏控制器&#xff0c;如 ILI6480、ILI9341 等&#xff0c;采用 I2C 接口。 这些控制器能够支持多点触控&#xff0c;并具有优秀的灵敏度和响应速度。 Ilitek 的触摸屏控制器监测屏幕上的触摸事件。 当触摸发生时&am…...

C++ 越来越像函数式编程了!

C 越来越像函数式编程了 大家好&#xff0c;欢迎来到今天的博客话题。今天我们要聊的是 C 这门老牌的强类型语言是如何一步一步向函数式编程靠拢的。从最早的函数指针&#xff0c;到函数对象&#xff08;Functor&#xff09;&#xff0c;再到 std::function 和 std::bind&…...

maven工程结构说明

1、maven工程文件目录 |-- pom.xml # Maven 项目管理文件 |-- src # 放项目源文件|-- main # 项目主要代码| |-- java # Java 源代码目录| | -- com/example/myapp…...

【GESP】C++一级真题练习(202312)luogu-B3921,小杨的考试

GESP一级真题练习。为2023年12月一级认证真题。逻辑计算问题。 题目题解详见&#xff1a;【GESP】C一级真题练习(202312)luogu-B3921&#xff0c;小杨的考试 | OneCoder 【GESP】C一级真题练习(202312)luogu-B3921&#xff0c;小杨的考试 | OneCoderGESP一级真题练习。为2023…...

游戏中Dubbo类的RPC设计时的注意要点

一.消费方 1.需要使用到动态代理&#xff0c;代理指定的接口&#xff0c;这样子接口被调用时&#xff0c;就可以拿到&#xff1a;"类名 方法名参数返回值" 这些类型。 2.既然是rpc&#xff0c;那么接口被调用时&#xff0c;肯定在动态代理中会进行网络消息的发送&a…...

ARXML汽车可扩展标记性语言规范讲解

ARXML: Automotive Extensible Markup Language &#xff08;汽车可扩展标记语言&#xff09; xmlns: Xml name space &#xff08;xml 命名空间&#xff09; xsd: Xml Schema Definition (xml 架构定义) 1、XML与HTML的区别&#xff0c;可扩展。 可扩展&#xff0c;主要是…...

Hadoop(HDFS)

Hadoop是一个开源的分布式系统架构&#xff0c;旨在解决海量数据的存储和计算问题&#xff0c;Hadoop的核心组件包括Hadoop分布式文件系统&#xff08;HDFS&#xff09;、MapReduce编程模型和YARN资源管理器,最近需求需要用到HDFS和YARN。 文章目录 HDFS优缺点HDFS的读写原理 常…...

机器学习系列----梯度下降算法

梯度下降算法&#xff08;Gradient Descent&#xff09;是机器学习和深度学习中最常用的优化算法之一。无论是在训练神经网络、线性回归模型&#xff0c;还是其他类型的机器学习模型时&#xff0c;梯度下降都是不可或缺的一部分。它的核心目标是最小化一个损失函数&#xff08;…...

AI大模型:软件开发的未来之路

随着AI技术的快速发展&#xff0c;AI大模型正在对软件开发流程产生深远的影响。从代码自动生成到智能测试&#xff0c;AI大模型正在重塑软件开发的各个环节&#xff0c;为软件开发者、企业和整个产业链带来新的流程和模式变化。 首先&#xff0c;AI大模型的定义是指通过大规模…...

指标+AI+BI:构建数据分析新范式丨2024袋鼠云秋季发布会回顾

10月30日&#xff0c;袋鼠云成功举办了以“AI驱动&#xff0c;数智未来”为主题的2024年秋季发布会。大会深度探讨了如何凭借 AI 实现新的飞跃&#xff0c;重塑企业的经营管理方式&#xff0c;加速数智化进程。 作为大会的重要环节之一&#xff0c;袋鼠云数栈产品经理潮汐带来了…...

2024年第四届“网鼎杯”网络安全比赛---朱雀组Crypto- WriteUp

2024年第四届“网鼎杯”网络安全比赛---朱雀组Crypto-WriteUp Crypto&#xff1a;Crypto-2&#xff1a;Crypto-3&#xff1a; 前言&#xff1a;本次比赛已经结束&#xff0c;用于赛后复现&#xff0c;欢迎大家交流学习&#xff01; Crypto&#xff1a; Crypto-2&#xff1a; …...

关于Markdown的一点疑问,为什么很多人说markdown比word好用?

markdown和word压根不是一类工具&#xff0c;不存在谁比谁好&#xff0c;只是应用场景不一样。 你写博客、写readme肯定得markdown&#xff0c;但写合同、写简历肯定word更合适。 markdown和word类似邮箱和微信的关系&#xff0c;这两者都可以通信&#xff0c;但微信因为功能…...

NoSQL大数据存储技术测试(1)绪论

写在前面&#xff1a;未完成测试的同学&#xff0c;请先完成测试&#xff0c;此博文供大家复习使用&#xff0c;&#xff08;我的答案&#xff09;均为正确答案&#xff0c;大家可以放心复习 单项选择题 第1题 以下不属于云计算部署模型的是&#xff08; &#xff09; 公…...

Linux命令学习,git命令

Linux系统&#xff0c;Git是一个强大的版本管理系统&#xff0c;允许用户跟踪代码的更改、管理项目历史以及与他人协作。 Linux Git命令&#xff1a; 初始化仓库:当前目录创建一个Git仓库&#xff0c;生成.git隐藏目录存储版本历史和其他Git相关的元数据。 git init 克隆仓库…...