15_基于Flink将pulsar数据写入到ClickHouse
3.8.基于Flink将数据写入到ClickHouse
编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作
3.8.1.ClickHouse基本介绍
ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。

结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。
3.8.2.ClickHouse安装步骤
本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本
- 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
- 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
- 3-修改配置文件
vim /etc/clickhouse-server/config.xml
修改178行: 打开这一行的注释
<listen_host>::</listen_host>

- 4-启动clickhouse的server
systemctl start clickhouse-server
停止:
systemctl stop clickhouse-server
重启
systemctl restart clickhouse-server
- 5-进入客户端

3.8.3.在ClickHouse中创建目标表
create database itcast_ck;
use itcast_ck;
create table itcast_ck.itcast_ck_ems(
id int,
sid varchar(128),
ip varchar(128),
create_time varchar(128),
session_id varchar(128),
yearInfo varchar(128),
monthInfo varchar(128),
dayInfo varchar(128),
hourInfo varchar(128),
seo_source varchar(128),
area varchar(128),
origin_channel varchar(128),
msg_count int(128),
from_url varchar(128),
PRIMARY KEY (`id`)
) ENGINE=ReplacingMergeTree();
3.8.4.编写Flink代码完成写入到CK操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;import java.sql.Types;
import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2 转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {@Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder().setDrivername("ru.yandex.clickhouse.ClickHouseDriver").setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck").setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute("itcast_to_ck");}
}
3.9.HBase对接Phoenix实现即席查询
3.9.1.Phoenix安装操作
Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase
整个安装操作, 大家可以参考资料中安装手册, 进行安装即可
3.9.2.在Phoenix中创建表
create view "itcast_h_ems" (
"id" integer primary key,
"f1"."sid" varchar,
"f1"."ip" varchar,
"f1"."create_time" varchar,
"f1"."session_id" varchar,
"f1"."yearInfo" varchar,
"f1"."monthInfo" varchar,
"f1"."dayInfo" varchar,
"f1"."hourInfo" varchar,
"f1"."seo_source" varchar,
"f1"."area" varchar,
"f1"."origin_channel" varchar,
"f1"."msg_count" integer,
"f1"."from_url" varchar
);
3.9.3.在Phoenix中类型说明

相关文章:
15_基于Flink将pulsar数据写入到ClickHouse
3.8.基于Flink将数据写入到ClickHouse 编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C语言编写,主要用…...
Pycharm如何打断点进行调试?
断点调试,是编写程序中一个很重要的步骤,有些简单的程序使用print语句就可看出问题,而比较复杂的程序,函数和变量较多的情况下,这时候就需要打断点了,更容易定位问题。 一、添加断点 在代码的行标前面&…...
微服务02-docker
1、Docker架构 1.1 镜像和容器 Docker中有几个重要的概念: 镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。Docker镜像是用于创建 Docker 容器的模板 。就像面向对象编…...
CSS:盒子模型 与 多种横向布局方法
目录 盒子模型块级盒子内联级盒子内联块级盒子弹性盒子display 改变模型区域划分text 内容区padding 填充区border 边框区margin 外边距直接设置盒子大小 布局横向布局方法一 float 浮起来方法二 内联块级元素实现方法三 弹性盒子模型 盒子模型 块级盒子 独占一行,…...
用node.js搭建一个视频推流服务
由于业务中有不少视频使用的场景,今天来说说如何使用node完成一个视频推流服务。 先看看效果: 这里的播放的视频是一个多个Partial Content组合起来的,每个Partial Content大小是1M。 一,项目搭建 (1)初…...
【SpringCloud】Feign远程调用
先来看我们以前利用RestTemplate发起远程调用的代码: String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class);存在下面的问题: • 代码可读性差,编程体验不统一 • …...
集合Collection-List-ArrayList学习
一、集合 集合是数据容器。相较于数组集合具有以下几个特点: 数组一旦创建,长度不可改变。集合的长度会自动扩容。集合具有很多数组没有的功能函数API数组元素的存储特点单一,不同的集合有不同的存储特点。 1. Collection顶层接口 Collect…...
mybatispuls代码生成器
引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...
【设计模式】-代理模式
在软件开发中,经常遇到需要对某个对象进行控制或者监控的场景。而直接修改对象的代码可能使代码变得复杂且难以维护。这时,使用代理模式(Proxy Pattern)可以很好地解决这个问题。 代理模式是一种结构型设计模式,通过引…...
爬虫ip池越大越好吗?
作为一名资深的程序员,今天我要给大家分享一些关于爬虫ip池的知识。关于ip代理池的问题,答案是肯定的,池子越大越好。下面跟我一起来盘点一下ip池大的好处吧! 1、提高稳定性 爬虫ip池越大,意味着拥有更多可用的爬虫ip…...
目标检测常用的数据集格式
在目标检测领域,有三种常用的数据集: 数据集标注文件格式bbox格式vocxmlxmin, ymin, xmax, ymax:bbox左上角(xmin, ymin)和右下角(xmax, ymax)的坐标cocojsonx, y, w, h:bbox左上角坐标(x, y)以及宽(w)和高(h)yolotxtxcenter, ycenter, w, h:bbox的中心…...
chrome插件开发实例03-使用 chrome.storage API永久保存数据
目录 防止数据丢失 使用chrome.storage API 功能 功能演示 源代码 manifest.json popup.html...
Segment Anything(SAM) 计算过程
给定输入图像 I ∈ R 3 H W I \in R^{3 \times H \times W} I∈R3HW。给定需要的prompts: M ∈ R 1 H W M \in R^{1 \times H \times W} M∈R1HW,代表图片的前背景信息。 P ∈ R N 2 P \in R^{N \times 2} P∈RN2,其中 N N N 是点的个数…...
Nacos配置文件读取源码解析
Nacos配置文件读取 本篇文章是探究,springboot启动时nacos是如何将配置中心的配置读取到springboot环境中的 PropertySourceLocator org.springframework.cloud.bootstrap.config.PropertySourceLocator 是 springcloud 定义的一个顶级接口,用来定义所…...
Linux0.11内核源码解析-fcntl.c/iotcl.c/stat.c
fcntl fcntl.c实现了文件控制系统调用fcntl和两个文件句柄描述符的复制系统调用dup()和dup2()。 dup返回当前值最小的未用句柄,dup2返回指定新句柄的数值,句柄的复制操作主要用在文件的标准输入、输出重定向和管道方面。 dupfd 复制文件句柄ÿ…...
OpenStack简介
OpenStack简介 目录 OpenStack简介 1、云计算模式2、云计算 虚拟化 openstack之间的关系?3、OpenStack 中有哪些组件?4、计算节点负责虚拟机运行5、网络节点负责对外网络与内网之间的通信 5.1 网络节点仅包含Neutron服务5.2 网络节点包含三个网络端口6、…...
二分法的应用
文章目录 什么是二分法🎮二分查找的优先级二分查找的步骤💥图解演示🧩 代码演示🫕python程序实现🐈⬛C程序实现🐕🦺C程序实现🐯Java程序实现🐳 非常规类二分查找&…...
ChatGPT在大规模数据处理和信息管理中的应用如何?
ChatGPT作为一种强大的自然语言处理模型,在大规模数据处理和信息管理领域有着广泛的应用潜力。它可以利用其文本生成、文本理解和问答等能力,为数据分析、信息提取、知识管理等任务提供智能化的解决方案。以下将详细介绍ChatGPT在大规模数据处理和信息管…...
【算法篇C++实现】五大常规算法
文章目录 🚀一、分治法⛳(一)算法思想⛳(二)相关代码 🚀二、动态规划算法⛳(一)算法思想⛳(二)相关代码 🚀三、回溯算法⛳(一…...
MySQL和钉钉单据接口对接
MySQL和钉钉单据接口对接 数据源系统:钉钉 钉钉(DingTalk)是阿里巴巴集团打造的企业级智能移动办公平台,是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工牌…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
基于 HTTP 的单向流式通信协议SSE详解
SSE(Server-Sent Events)详解 🧠 什么是 SSE? SSE(Server-Sent Events) 是 HTML5 标准中定义的一种通信机制,它允许服务器主动将事件推送给客户端(浏览器)。与传统的 H…...
信息系统分析与设计复习
2024试卷 单选题(20) 1、在一个聊天系统(类似ChatGPT)中,属于控制类的是()。 A. 话语者类 B.聊天文字输入界面类 C. 聊天主题辨别类 D. 聊天历史类 解析 B-C-E备选架构中分析类分为边界类、控制类和实体类。 边界…...
centos挂载目录满但实际未满引发系统宕机
测试服务器应用系统突然挂了,经过排查发现是因为磁盘“满了”导致的,使用df -h查看磁盘使用情况/home目录使用率已经到了100%,但使用du -sh /home查看发现实际磁盘使用还不到1G,推测有进程正在写入或占用已删除的大文件(Linux 系统…...
