ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
文章目录
- Kafka表集成引擎
- 配置
- Kerberos 支持
- 虚拟列
- 资料分享
- 参考文章
Kafka表集成引擎
此引擎与Apache Kafka结合使用。
Kafka 特性:
- 发布或者订阅数据流。
- 容错存储机制。
- 处理流数据。
老版Kafka集成表引擎参数格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
新版Kafka集成表引擎参数格式:
Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2
必要参数:
kafka_broker_list– 以逗号分隔的 brokers 列表 (localhost:9092)。kafka_topic_list– topic 列表 (my_topic)。kafka_group_name– Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。kafka_format– 消息体格式。使用与 SQL 部分的FORMAT函数相同表示方法,例如JSONEachRow。
可选参数:
kafka_row_delimiter- 每个消息体(记录)之间的分隔符。kafka_schema– 如果解析格式需要一个 schema 时,此参数必填。kafka_num_consumers– 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
| 格式 | 输入 | 输出 |
|---|---|---|
| [TabSeparated] | ✔ | ✔ |
| [TabSeparatedRaw] | ✔ | ✔ |
| [TabSeparatedWithNames] | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes] | ✔ | ✔ |
| [Template] | ✔ | ✔ |
| [TemplateIgnoreSpaces] | ✔ | ✗ |
| [CSV] | ✔ | ✔ |
| [CSVWithNames] | ✔ | ✔ |
| [CustomSeparated] | ✔ | ✔ |
| [Values] | ✔ | ✔ |
| [Vertical] | ✗ | ✔ |
| [JSON] | ✗ | ✔ |
| [JSONAsString] | ✔ | ✗ |
| [JSONStrings] | ✗ | ✔ |
| [JSONCompact] | ✗ | ✔ |
| [JSONCompactStrings] | ✗ | ✔ |
| [JSONEachRow] | ✔ | ✔ |
| [JSONEachRowWithProgress] | ✗ | ✔ |
| [JSONStringsEachRow] | ✔ | ✔ |
| [JSONStringsEachRowWithProgress] | ✗ | ✔ |
| [JSONCompactEachRow] | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes] | ✔ | ✔ |
| [JSONCompactStringsEachRow] | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes] | ✔ | ✔ |
| [TSKV] | ✔ | ✔ |
| [Pretty] | ✗ | ✔ |
| [PrettyCompact] | ✗ | ✔ |
| [PrettyCompactMonoBlock] | ✗ | ✔ |
| [PrettyNoEscapes] | ✗ | ✔ |
| [PrettySpace] | ✗ | ✔ |
| [Protobuf] | ✔ | ✔ |
| [ProtobufSingle] | ✔ | ✔ |
| [Avro] | ✔ | ✔ |
| [AvroConfluent] | ✔ | ✗ |
| [Parquet] | ✔ | ✔ |
| [Arrow] | ✔ | ✔ |
| [ArrowStream] | ✔ | ✔ |
| [ORC] | ✔ | ✔ |
| [RowBinary] | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes] | ✔ | ✔ |
| [Native] | ✔ | ✔ |
| [Null] | ✗ | ✔ |
| [XML] | ✗ | ✔ |
| [CapnProto] | ✔ | ✗ |
| [LineAsString] | ✔ | ✗ |
| [Regexp] | ✔ | ✗ |
| [RawBLOB] | ✔ | ✔ |
示例:
CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');SELECT * FROM queue LIMIT 5;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1')SETTINGS kafka_format = 'JSONEachRow',kafka_num_consumers = 4;
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。
SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:
- 使用引擎创建一个 Kafka 消费者并作为一条数据流。
- 创建一个结构表。
- 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
示例:
CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');CREATE TABLE daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day, (day, level), 8192);CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;SELECT level, sum(total) FROM daily GROUP BY level;
为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
DETACH TABLE consumer;ATTACH TABLE consumer;
如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
配置
与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。
<!-- Global configuration options for all tables of Kafka engine type --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "logs" --><kafka_logs><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_logs>
在ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>。
Kerberos 支持
对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
示例:
<!-- Kerberos-aware Kafka --><kafka><security_protocol>SASL_PLAINTEXT</security_protocol><sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab><sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal></kafka>
虚拟列
_topic– Kafka 主题。_key– 信息的键。_offset– 消息的偏移量。_timestamp– 消息的时间戳。_timestamp_ms– 消息的时间戳(毫秒)。_partition– Kafka 主题的分区。
资料分享
ClickHouse经典中文文档分享
参考文章
- ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
- ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
- ClickHouse(03)ClickHouse怎么安装和部署
- ClickHouse(04)如何搭建ClickHouse集群
- ClickHouse(05)ClickHouse数据类型详解
- ClickHouse(06)ClickHouse建表语句DDL详细解析
- ClickHouse(07)ClickHouse数据库引擎解析
- ClickHouse(08)ClickHouse表引擎概况
- ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
- ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
- ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
- ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
- ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
- ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
- ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
- ClickHouse(16)ClickHouse日志引擎Log详细解析
- ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
- ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
- ClickHouse(19)ClickHouse集成Hive表引擎详细解析
- ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析
相关文章:
ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
文章目录 Kafka表集成引擎配置Kerberos 支持 虚拟列 资料分享参考文章 Kafka表集成引擎 此引擎与Apache Kafka结合使用。 Kafka 特性: 发布或者订阅数据流。容错存储机制。处理流数据。 老版Kafka集成表引擎参数格式: Kafka(kafka_broker_list, kaf…...
JSP-概念
一、引子 很多读者可能听过JSP,并且知道这是一门过时的技术了。在Spring,SpringBoot已经成为主流的今天,笔者为什么还要介绍JSP的相关内容呢?笔者常常提到一个概念:理解一门技术,要理解这个技术为什么产生…...
sqlite插入语句id自增列问题
sqlite给主键id设置AUTOINCREMENT自增在插入数据的时候报错table has x columns but x-1 values were supplied 为什么自增列要显示不提供,sqlite需要提供自增列table ResTools has 7 columns but 6 values were supplied SQL Statement:insert into ResTools values(管理系统w…...
C#,字符串匹配(模式搜索)AC(Aho Corasick)算法的源代码
Aho-Corasick算法简称AC算法,也称为AC自动机(Aho-Corasick)算法,1975年产生于贝尔实验室(The Bell Labs),是一种用于解决多模式字符串匹配的经典算法之一。 the Bell Lab 本文的运行效果: AC算法以模式树…...
【网络取证篇】Windows终端无法使用ping命令解决方法
【网络取证篇】Windows终端无法使用ping命令解决方法 以Ping命令为例,最近遇到ping命令无法使用的情况,很多情况都是操作系统"环境变量"被改变或没有正确配置导致—【蘇小沐】 目录 1、实验环境(一)无法ping命令 &a…...
electron+vue网页直接播放RTSP视频流?
目前大部分摄像头都支持RTSP协议,但是在浏览器限制,最新版的浏览器都不能直接播放RTSP协议,Electron 桌面应用是基于 Chromium 内核的,所以也不能直接播放RTSP,但是我们又有这个需求怎么办呢? 市场上的方案…...
【Delphi 基础知识 19】Assigned的用法
在Delphi中,Assigned 是一个用于检查指针是否已分配内存的函数。它通常用于检查对象或指针是否已经被分配内存,以避免在未分配内存的情况下引用或操作它。 以下是 Assigned 的一些用法示例: 检查对象是否已分配内存: varMyObject…...
多线程在编程中的重要性有什么?并以LabVIEW为例进行说明
多线程在编程中的重要性体现在以下几个方面: 并行处理: 多线程允许程序同时执行多个任务,这在现代多核心处理器上尤其重要。通过并行处理,可以显著提高程序的执行效率和响应速度。 资源利用最大化: 通过多线程&#x…...
K8S---kubectl top
一、简介 该命令类似于linux–top命令,用于显示node和pod的CPU和内存使用情况 二、命令行 1、help命令 k top --help Display resource (CPU/memory) usage. The top command allows you to see the resource consumption for nodes or pods. This command requires Metri…...
Linux部署前后端项目
部署SpringBoot项目 创建SpringBoot项目 先确保有一个可以运行的springboot项目,这里就记录创建项目的流程了,可以自行百度。 命令行启动 2.1、在linux中,我是在data目录下新创建的一个project目录(此目录创建位置不限制&…...
一文搞懂系列——Linux C线程池技术
背景 最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。 线程池的优点 线程池出现的背景…...
stable diffusion代码学习笔记
前言:本文没有太多公式推理,只有一些简单的公式,以及公式和代码的对应关系。本文仅做个人学习笔记,如有理解错误的地方,请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…...
腾讯云服务器怎么买?两种购买方式更省钱
腾讯云服务器购买流程很简单,有两种购买方式,直接在官方活动上购买比较划算,在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵,但是自定义购买云服务器CPU内存带宽配置选择范围广,活动上购买只能选择固定的活动…...
基于SpringBoot自定义控制是否需要开启定时功能
在基于SpringBoot的开发过程中,有时候会在应用中使用定时任务,然后服务器上启动定时任务,本地就不需要开启定时任务,使用一个参数进行控制,通过查资料得知非常简单。 参数配置 在application-dev.yml中加入如下配置 …...
“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)
文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件?”提示。 2.问题分析 如果这个文件在NTFS分区上,且存在特殊的安全属性。那么把它从NT…...
视频监控系统EasyCVR如何通过调用API接口查询和下载设备录像?
智慧安防平台EasyCVR是基于各种IP流媒体协议传输的视频汇聚和融合管理平台。视频流媒体服务器EasyCVR采用了开放式的网络结构,支持高清视频的接入和传输、分发,平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联…...
15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条
15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条 progressBar2.setIndeterminate(true);//设置无限模式,运行查看动态效果 //创建并设置无限模式元素 ShapeElement element new ShapeElement(); element.setBounds(0,0,50,50); element.setRgbColor(new RgbColor(255,0,0)); …...
【FastAPI】路径参数
路径参数 from fastapi import FastAPIapp FastAPI()app.get("/items/{item_id}") async def read_item(item_id):return {"item_id": item_id}其中{item_id}就为路径参数 运行以上程序当访问 :http://127.0.0.1:8000/items/fastapi时候 将会…...
【docker笔记】DockerFile
DockerFile Docker镜像结构的分层 镜像不是一个单一的文件,而是有多层构成。 容器其实是在镜像的最上面加了一层读写层,在运行容器里做的任何文件改动,都会写到这个读写层。 如果删除了容器,也就是删除了其最上面的读写层&…...
React项目搭建流程
第一步 利用脚手架创建ts类型的react项目: 执行如下的命令:create-react-app myDemo --template typescript ; 第二步 清理项目目录结构: src/ index.tsx, app.txs, react-app-env.d.ts public/index.ht…...
利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...
Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...
springboot 日志类切面,接口成功记录日志,失败不记录
springboot 日志类切面,接口成功记录日志,失败不记录 自定义一个注解方法 import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/***…...
