当前位置: 首页 > news >正文

探索ClickHouse——连接Kafka和Clickhouse

安装Kafka

新增用户

sudo adduser kafka
sudo adduser kafka sudo
su -l kafka

安装JDK

sudo apt-get install openjdk-8-jre

下载解压kafka

可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

配置

配置kafka

vim ~/kafka/config/server.properties

将下面这行加入文件的末尾

# ~/kafka/config/server.properties
delete.topic.enable=true

同时修改log的路径

# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs

创建zookeeper service

sudo vim /etc/systemd/system/zookeeper.service

将下面内容填入上述文件中

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

创建kafka service

sudo vim /etc/systemd/system/kafka.service

将下面内容填入上述文件中

[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

启动kafka#

启动服务

sudo systemctl start kafka

查看状态

sudo systemctl status kafka

● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.

可以看到kafka已经处于running状态。

测试

创建Topic

~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

发送消息

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

订阅Topic

新启动一个界面,执行下面命令

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

它会收到上面发的消息

Hello, World

连接

创建表

使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。

clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;

CREATE TABLE uk_price_paid_from_kafka
(
uuid_string String,
price_string String,
time String,
postcode String,
a String,
b String,
c String,
addr1 String,
addr2 String,
street String,
locality String,
town String,
district String,
county String,
d String,
e String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.

给kafka发送消息

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

进入消息输入模式,发送下面两个消息

"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

在这里插入图片描述

Clickhouse收到消息

在clickhouse-client交互终端中执行下面指令:

select * from uk_price_paid_from_kafka;

在这里插入图片描述
可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。

问题

后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。

参考资料

  • https://openjdk.org/install/
  • https://kafka.apache.org/quickstart
  • https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
  • https://cloud.tencent.com/developer/article/1892086
  • https://sineyuan.github.io/post/clickhouse-kafka/

相关文章:

探索ClickHouse——连接Kafka和Clickhouse

安装Kafka 新增用户 sudo adduser kafka sudo adduser kafka sudo su -l kafka安装JDK sudo apt-get install openjdk-8-jre下载解压kafka 可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否…...

基于监督学习的多模态MRI脑肿瘤分割,使用来自超体素的纹理特征(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

【RocketMQ】(八)Rebalance负载均衡

消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不…...

线性筛和埃氏筛

线性筛&#xff1a; #define _CRT_SECURE_NO_WARNINGS #include<iostream> #include<cstdio> #include<cstdlib> #include<string> #include<cstring> #include<cmath> #include<ctime> #include<algorithm> #include<ut…...

【Java 进阶篇】JDBC ResultSet 类详解

在Java应用程序中&#xff0c;与数据库交互通常涉及执行SQL查询以检索数据。一旦执行查询&#xff0c;您将获得一个ResultSet对象&#xff0c;该对象包含查询结果的数据。本文将深入介绍ResultSet类&#xff0c;它是Java JDBC编程中的一个核心类&#xff0c;用于处理查询结果。…...

Centos7常用服务脚本(.service)

Centos7常用服务脚本&#xff08;.service&#xff09; 注意&#xff1a;[Service]中配置路径必须使用绝对路径。 启停&#xff1a; systemctl { start | stop | restart | reload } xxx.service 自启动&#xff1a; systemctl { enable | disable } xxx.service nginx.se…...

MySQL 视图View的SQL语法和更新(视图篇 二)

视图语法基本操作 创建 -- [ ]表示可选 create [or replace] view 视图名称[(列名列表)] as select语句 [ with [cascaded | local ] check option ]; 添加&#xff08;虽然视图是虚拟表&#xff0c;但是向视图操作的数据实际上会影响到实际关联的表数据&#xff09; -- 视图添…...

38 翻转二叉树

翻转二叉树 理解题意&#xff0c;翻转即每个结点的左右子树翻转/对调题解1 递归——自下而上题解2 迭代——自上而下 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 提示&#xff1a; 树中节点数目范围在 [0, 100] 内-100 < Node.…...

数据结构-快速排序-C语言实现

引言&#xff1a;快速排序作为一种非常经典且高效的排序算法&#xff0c;无论是工作还是面试中广泛用到&#xff0c;作为一种分治思想&#xff0c;需要熟悉递归思想。下面来讲讲快速排序的实现和改进。 老规矩&#xff0c;先用图解来理解一下&#xff1a;&#xff08;这里使用快…...

玩客云Armbian_23.08.0-trunk_Onecloud_bookworm_edge_6.4.14.burn配置

固定IP # interface file auto-generated by buildrootauto lo iface lo inet loopback// 上面是默认的内容,下面是新增的内容,上下之间需要一个空行隔开 // 接口顶格写,属性的前面有一个tab的缩进 # The primary network interfaceauto eth0 iface eth0 inet staticaddress 1…...

Nginx查找耗时的接口

Nginx查找耗时的接口 # grep 是筛选的域名 awk中的$5是判断的状态码 sort中的15是指的upstream_response_time 当然也可以统计request_time的时间cat access.log | grep zhhll.icu | awk $5 200{print $0} | sort -k 15 -n -r | head -10 https://zhhll.icu/2021/linux/实…...

C++ Primer 一 变量和基本类型

本章讲解C内置的数据类型&#xff08;如&#xff1a;字符、整型、浮点数等&#xff09;和自定义数据类型的机制。下一章讲解C标准库里面定义的更加复杂的数据类型&#xff0c;比如可变长字符串和向量等。 1.基本内置类型 C内置的基本类型包括&#xff1a;算术类型和空类型。算…...

实体行业数字化转型怎么做?线上线下相结合的新零售体系怎么做?

如今&#xff0c;实体行业想要取得收入增长&#xff0c;只做线下业务或者只做线上业务&#xff0c;在当前的市场环境中是难以长久生存的&#xff0c;因此一定要线上线下相结合&#xff0c;将流量运作与线下转化进行充分结合&#xff0c;才能更好地发挥实体优势&#xff0c;带来…...

JAVA面经整理(5)

创建线程池不是说现用先创建&#xff0c;而是要是可以复用线程池中的线程&#xff0c;就很好地避免了大量用户态和内核态的交互&#xff0c;不需要频繁的创建和销毁线程 一)什么是池化技术&#xff1f;什么是线程池&#xff1f; 1)池化技术是提前准备好一些资源&#xff0c;在…...

【牛客网-面试必刷TOP101】二分查找题目

目录 二维数组中的查找_牛客题霸_牛客网 (nowcoder.com) 寻找峰值_牛客题霸_牛客网 (nowcoder.com) 数组中的逆序对_牛客题霸_牛客网 (nowcoder.com) 旋转数组的最小数字_牛客题霸_牛客网 (nowcoder.com) 二维数组中的查找_牛客题霸_牛客网 (nowcoder.com) 题意&#xff1a…...

【QT】自定义组件ui类添加到主ui界面方法

1.添加自定义组件到项目中 add new选择如下 写好类方法&#xff0c;确定即可 2.将新创建的ui类加入到主ui界面 选中新创建ui类的父类空块&#xff0c;右键选择提升为 选择并添加新创建的类...

FFmpeg 多图片合成视频带字幕和音乐+特效(淡入淡出,圆圈黑色淡出)

FFmpeg 多图片合成视频带字幕和音乐+特效(淡入淡出,圆圈黑色淡出) 效果图1. 报错及解决2. xfade、xfade_opeccl 特效切换3. ffmpeg命令行详解4. 源码4.1 auto.bash4.2 geneFade.py4.3 python moviepy合并视频及音频按照(视频长度截取对应的音频在合并)4.4 命令行记录参考这…...

上网Tips: Linux截取动态效果图工具_byzanz

链接1 链接2 安装&#xff1a; sudo apt-get install byzanz 查看指令 说明 byzanz-record --help日常操作 xwininfo点击 待录制窗口 左上角 byzanz-record -x 72 -y 64 -w 1848 -h 893 -d 10 --delay5 -c /home/xixi/myGIF/test.gif小工具 获取鼠标坐标 xdotool getm…...

下载盗版网站视频并将.ts视频文件合并

. 1.分析视频请求123 2.数据获取和拼接 1.分析视频请求 1 通过抓包观察我们发现视频是由.ts文件拼接成的每一个.ts文件代表一小段2 通过观察0.ts和1.ts的url我们发现他们只有最后一段不同我们网上找到url获取的包3 我们发现index.m3u8中储存着所有的.ts文件名在拼接上前面固定…...

ElasticSearch - 基于 拼音分词器 和 IK分词器 模拟实现“百度”搜索框自动补全功能

目录 一、自动补全 1.1、效果说明 1.2、安装拼音分词器 1.3、自定义分词器 1.3.1、为什么要自定义分词器 1.3.2、分词器的构成 1.3.3、自定义分词器 1.3.4、面临的问题和解决办法 问题 解决方案 1.4、completion suggester 查询 1.4.1、基本概念和语法 1.4.2、示例…...

【kubernetes】kubernetes中的调度

1 调度过程 调度的本来含义是指决定某个任务交给某人来做的过程&#xff0c;kubernetes中的调度是指决定Pod在哪个Node上运行。 k8s的调度分为2个过程&#xff1a; 预选&#xff1a;去掉不满足条件的节点优选&#xff1a;对剩下符合条件的节点按照一些策略进行排序&#xff…...

java读取csv文件或者java读取字符串,找出引号内容,采用正则表达式书写

将一个csv文件复制出来将后缀改变为txt,我们就得到了一个文件文件打开这个txt文件&#xff0c;可以看到每一个字段之间都是用英文逗号隔开 正常的内容形似 20,C4,Pm,tem,tion,21,A4,E,H,"1,2,3,NA,aaa,bbbb,cccc,ddd,N/A,aaa,bbbb,cccc,ddd,tttttt对于这种我们只需要进行…...

【寻找关键钥匙】python实现-附ChatGPT解析

1.题目 寻找关键钥匙 知识点字符串、编程基础、正则表达式、排序 时间限制:1s 空间限制: 256MB 限定语言:不限 题目描述: 小强正在参加《密室逃生》游戏,当前关卡要求找到符合给定 密码K(升序的不重复小写字母组成)的箱子,并给出箱子编号,箱子编号为1~N。 每个箱子中都有一个…...

基于 QT 实现一个 Ikun 专属桌面宠物

Step0、实现思路 想到的思路有两种&#xff1a; 1、使用 QT 的状态机模式&#xff0c;参考官网文档&#xff0c;这个模式的解耦最佳 2、使用原生 Wigets&#xff0c;将窗口设置为透明无框&#xff0c;循环播放桌面宠物的状态 本文采用第二种思路&#xff0c;实现一个极简版…...

新闻报道的未来:自动化新闻生成与爬虫技术

概述 自动化新闻生成是一种利用自然语言处理和机器学习技术&#xff0c;从结构化数据中提取信息并生成新闻文章的方法。它可以实现大规模、高效、多样的新闻内容生产。然而&#xff0c;要实现自动化新闻生成&#xff0c;首先需要获取可靠的数据源。这就需要使用爬虫技术&#…...

C++ 并发编程实战 第八章 设计并发代码 二

目录 8.3 设计数据结构以提升多线程程序的性能 8.3.1 针对复杂操作的数据划分 8.3.2 其他数据结构的访问模式 8.4 设计并发代码时要额外考虑的因素 8.4.1 并行算法代码中的异常安全 8.4.2 可扩展性和Amdahl定律 8.4.3 利用多线程隐藏等待行为 8.4.4 借并发特性改进响应…...

list(链表)

文章目录 功能迭代器的分类sort函数&#xff08;排序&#xff09;merage&#xff08;归并&#xff09;unique(去重&#xff09;removesplice&#xff08;转移&#xff09; 功能 这里没有“[]"的实现&#xff1b;原因&#xff1a;实现较麻烦&#xff1b;这里使用迭代器来实…...

使用代理IP进行安全高效的竞争情报收集,为企业赢得竞争优势

在激烈的市场竞争中&#xff0c;知己知彼方能百战百胜。竞争对手的信息对于企业来说至关重要&#xff0c;它提供了洞察竞争环境和市场的窗口。在这个信息时代&#xff0c;代理IP是一种实用的工具&#xff0c;可以帮助企业收集竞争对手的产品信息和营销活动数据&#xff0c;为企…...

【数学知识】一些数学知识,以供学习

矩阵的特征值和特征向量 https://zhuanlan.zhihu.com/p/104980382 矩阵的逆 https://zhuanlan.zhihu.com/p/163748569 对数似然方程(log-likelihood equation)&#xff0c;简称“似然方程”: https://baike.baidu.com/item/%E5%AF%B9%E6%95%B0%E4%BC%BC%E7%84%B6%E6%96%B9%E7…...

JKChangeCapture swift 版本的捕捉属性变化的工具

在OC的时代里&#xff0c;大家捕捉属性的变化通常是通过KVO机制来实现的&#xff0c;KVO把所有的属性变化都放在了一个方法进行相应处理&#xff0c;并不友好&#xff0c;之前基于KVO的机制实现了一套属性变化工具JKKVOHelper,这里不就在过多介绍这个了&#xff0c;在swift的时…...

网站制作中文版/百度关键词搜索次数

原题地址&#xff1a;http://oj.leetcode.com/problems/insertion-sort-list/ 题意&#xff1a;对链表进行插入排序。 解题思路&#xff1a;首先来对插入排序有一个直观的认识&#xff0c;来自维基百科。 代码循环部分图示&#xff1a; 代码&#xff1a; class Solution: # par…...

有没有做机械加工的网站/网站策划方案案例

copy过来有些图没了&#xff0c;见这里第二章 - 系统模型分3大块physical mode&#xff08;只考虑物理连接方面的模型&#xff09;&#xff1b;architechture mode&#xff08;主要是“计算”方面的体系结构模型&#xff0c;例如peer to peer、client to server&#xff09;fun…...

网站建设和网站优化的区别/微信信息流广告投放

近日一好友去阿里面试&#xff0c;面试失败了&#xff0c;分享了一个他最不擅长的算法面试题。题目是这样的。 题目&#xff1a;给定一个二叉搜索树(BST)&#xff0c;找到树中第 K 小的节点。 出题人&#xff1a;阿里巴巴出题专家&#xff1a;文景&#xff0f;阿里云 CDN 资深…...

wordpress ajax 插件/市场调研报告模板范文

目录介绍安装介绍 根据wikipedia的介绍&#xff1a; Docker 是一个开放源代码软件项目&#xff0c;让应用程序布署在软件容器下的工作可以自动化进行&#xff0c;借此在 Linux 操作系统上&#xff0c;提供一个额外的软件抽象层&#xff0c;以及操作系统层虚拟化的自动管理机制…...

做网站教程视频/做引流推广的平台

.数据结构试验——迷宫问题(一)基本问题1.问题描述这是心理学中的一个经典问题。心理学家把一只老鼠从一个无顶盖的大盒子的入口处放入&#xff0c;让老鼠自行找到出口出来。迷宫中设置很多障碍阻止老鼠前行&#xff0c;迷宫唯一的出口处放有一块奶酪&#xff0c;吸引老鼠找到出…...

wordpress 商品模板下载/互联网营销师考证多少钱

用户鼠标移入时&#xff0c;有弹出框出现&#xff0c;这样的需求很常见。这在处理HTML元素实现时简单&#xff0c;但是如果是对HTML5 Canvas 构成的图形进行处理&#xff0c;这种方法不再适用&#xff0c;因为Canvas使用的是另外一套机制&#xff0c;无论在Canvas上绘制多少图形…...