当前位置: 首页 > news >正文

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

文章目录

  • Kafka表集成引擎
    • 配置
      • Kerberos 支持
    • 虚拟列
  • 资料分享
  • 参考文章

Kafka表集成引擎

此引擎与Apache Kafka结合使用。

Kafka 特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。

老版Kafka集成表引擎参数格式:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版Kafka集成表引擎参数格式:

Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2

必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow

可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

格式输入输出
[TabSeparated]
[TabSeparatedRaw]
[TabSeparatedWithNames]
[TabSeparatedWithNamesAndTypes]
[Template]
[TemplateIgnoreSpaces]
[CSV]
[CSVWithNames]
[CustomSeparated]
[Values]
[Vertical]
[JSON]
[JSONAsString]
[JSONStrings]
[JSONCompact]
[JSONCompactStrings]
[JSONEachRow]
[JSONEachRowWithProgress]
[JSONStringsEachRow]
[JSONStringsEachRowWithProgress]
[JSONCompactEachRow]
[JSONCompactEachRowWithNamesAndTypes]
[JSONCompactStringsEachRow]
[JSONCompactStringsEachRowWithNamesAndTypes]
[TSKV]
[Pretty]
[PrettyCompact]
[PrettyCompactMonoBlock]
[PrettyNoEscapes]
[PrettySpace]
[Protobuf]
[ProtobufSingle]
[Avro]
[AvroConfluent]
[Parquet]
[Arrow]
[ArrowStream]
[ORC]
[RowBinary]
[RowBinaryWithNamesAndTypes]
[Native]
[Null]
[XML]
[CapnProto]
[LineAsString]
[Regexp]
[RawBLOB]

示例:

  CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');SELECT * FROM queue LIMIT 5;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;CREATE TABLE queue2 (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1')SETTINGS kafka_format = 'JSONEachRow',kafka_num_consumers = 4;

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。

SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

  1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
  2. 创建一个结构表。
  3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

示例:

  CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');CREATE TABLE daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day, (day, level), 8192);CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

停止接收主题数据或更改转换逻辑,请 detach 物化视图:

  DETACH TABLE consumer;ATTACH TABLE consumer;

如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

配置

GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

  <!-- Global configuration options for all tables of Kafka engine type --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "logs" --><kafka_logs><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_logs>

ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>

Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

示例:

  <!-- Kerberos-aware Kafka --><kafka><security_protocol>SASL_PLAINTEXT</security_protocol><sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab><sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal></kafka>

虚拟列

  • _topic – Kafka 主题。
  • _key – 信息的键。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的时间戳。
  • _timestamp_ms – 消息的时间戳(毫秒)。
  • _partition – Kafka 主题的分区。

资料分享

ClickHouse经典中文文档分享

参考文章

  • ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
  • ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
  • ClickHouse(03)ClickHouse怎么安装和部署
  • ClickHouse(04)如何搭建ClickHouse集群
  • ClickHouse(05)ClickHouse数据类型详解
  • ClickHouse(06)ClickHouse建表语句DDL详细解析
  • ClickHouse(07)ClickHouse数据库引擎解析
  • ClickHouse(08)ClickHouse表引擎概况
  • ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
  • ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
  • ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
  • ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
  • ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
  • ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
  • ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
  • ClickHouse(16)ClickHouse日志引擎Log详细解析
  • ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
  • ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
  • ClickHouse(19)ClickHouse集成Hive表引擎详细解析
  • ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析

相关文章:

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

文章目录 Kafka表集成引擎配置Kerberos 支持 虚拟列 资料分享参考文章 Kafka表集成引擎 此引擎与Apache Kafka结合使用。 Kafka 特性&#xff1a; 发布或者订阅数据流。容错存储机制。处理流数据。 老版Kafka集成表引擎参数格式&#xff1a; Kafka(kafka_broker_list, kaf…...

JSP-概念

一、引子 很多读者可能听过JSP&#xff0c;并且知道这是一门过时的技术了。在Spring&#xff0c;SpringBoot已经成为主流的今天&#xff0c;笔者为什么还要介绍JSP的相关内容呢&#xff1f;笔者常常提到一个概念&#xff1a;理解一门技术&#xff0c;要理解这个技术为什么产生…...

sqlite插入语句id自增列问题

sqlite给主键id设置AUTOINCREMENT自增在插入数据的时候报错table has x columns but x-1 values were supplied 为什么自增列要显示不提供,sqlite需要提供自增列table ResTools has 7 columns but 6 values were supplied SQL Statement:insert into ResTools values(管理系统w…...

C#,字符串匹配(模式搜索)AC(Aho Corasick)算法的源代码

Aho-Corasick算法简称AC算法&#xff0c;也称为AC自动机(Aho-Corasick)算法&#xff0c;1975年产生于贝尔实验室&#xff08;The Bell Labs&#xff09;&#xff0c;是一种用于解决多模式字符串匹配的经典算法之一。 the Bell Lab 本文的运行效果&#xff1a; AC算法以模式树…...

【网络取证篇】Windows终端无法使用ping命令解决方法

【网络取证篇】Windows终端无法使用ping命令解决方法 以Ping命令为例&#xff0c;最近遇到ping命令无法使用的情况&#xff0c;很多情况都是操作系统"环境变量"被改变或没有正确配置导致—【蘇小沐】 目录 1、实验环境&#xff08;一&#xff09;无法ping命令 &a…...

electron+vue网页直接播放RTSP视频流?

目前大部分摄像头都支持RTSP协议&#xff0c;但是在浏览器限制&#xff0c;最新版的浏览器都不能直接播放RTSP协议&#xff0c;Electron 桌面应用是基于 Chromium 内核的&#xff0c;所以也不能直接播放RTSP&#xff0c;但是我们又有这个需求怎么办呢&#xff1f; 市场上的方案…...

【Delphi 基础知识 19】Assigned的用法

在Delphi中&#xff0c;Assigned 是一个用于检查指针是否已分配内存的函数。它通常用于检查对象或指针是否已经被分配内存&#xff0c;以避免在未分配内存的情况下引用或操作它。 以下是 Assigned 的一些用法示例&#xff1a; 检查对象是否已分配内存&#xff1a; varMyObject…...

多线程在编程中的重要性有什么?并以LabVIEW为例进行说明

多线程在编程中的重要性体现在以下几个方面&#xff1a; 并行处理&#xff1a; 多线程允许程序同时执行多个任务&#xff0c;这在现代多核心处理器上尤其重要。通过并行处理&#xff0c;可以显著提高程序的执行效率和响应速度。 资源利用最大化&#xff1a; 通过多线程&#x…...

K8S---kubectl top

一、简介 该命令类似于linux–top命令,用于显示node和pod的CPU和内存使用情况 二、命令行 1、help命令 k top --help Display resource (CPU/memory) usage. The top command allows you to see the resource consumption for nodes or pods. This command requires Metri…...

Linux部署前后端项目

部署SpringBoot项目 创建SpringBoot项目 先确保有一个可以运行的springboot项目&#xff0c;这里就记录创建项目的流程了&#xff0c;可以自行百度。 命令行启动 2.1、在linux中&#xff0c;我是在data目录下新创建的一个project目录&#xff08;此目录创建位置不限制&…...

一文搞懂系列——Linux C线程池技术

背景 最近在走读诊断项目代码时&#xff0c;发现其用到了线程池技术&#xff0c;感觉耳目一新。以前基本只是听过线程池&#xff0c;但是并没有实际应用。对它有一丝的好奇&#xff0c;于是趁这个机会深入了解一下线程池的实现原理。 线程池的优点 线程池出现的背景&#xf…...

stable diffusion代码学习笔记

前言&#xff1a;本文没有太多公式推理&#xff0c;只有一些简单的公式&#xff0c;以及公式和代码的对应关系。本文仅做个人学习笔记&#xff0c;如有理解错误的地方&#xff0c;请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…...

腾讯云服务器怎么买?两种购买方式更省钱

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…...

基于SpringBoot自定义控制是否需要开启定时功能

在基于SpringBoot的开发过程中&#xff0c;有时候会在应用中使用定时任务&#xff0c;然后服务器上启动定时任务&#xff0c;本地就不需要开启定时任务&#xff0c;使用一个参数进行控制&#xff0c;通过查资料得知非常简单。 参数配置 在application-dev.yml中加入如下配置 …...

“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)

文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件&#xff1f;”提示。 2.问题分析 如果这个文件在NTFS分区上&#xff0c;且存在特殊的安全属性。那么把它从NT…...

视频监控系统EasyCVR如何通过调用API接口查询和下载设备录像?

智慧安防平台EasyCVR是基于各种IP流媒体协议传输的视频汇聚和融合管理平台。视频流媒体服务器EasyCVR采用了开放式的网络结构&#xff0c;支持高清视频的接入和传输、分发&#xff0c;平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联…...

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条 progressBar2.setIndeterminate(true);//设置无限模式,运行查看动态效果 //创建并设置无限模式元素 ShapeElement element new ShapeElement(); element.setBounds(0,0,50,50); element.setRgbColor(new RgbColor(255,0,0)); …...

【FastAPI】路径参数

路径参数 from fastapi import FastAPIapp FastAPI()app.get("/items/{item_id}") async def read_item(item_id):return {"item_id": item_id}其中{item_id}就为路径参数 运行以上程序当访问 &#xff1a;http://127.0.0.1:8000/items/fastapi时候 将会…...

【docker笔记】DockerFile

DockerFile Docker镜像结构的分层 镜像不是一个单一的文件&#xff0c;而是有多层构成。 容器其实是在镜像的最上面加了一层读写层&#xff0c;在运行容器里做的任何文件改动&#xff0c;都会写到这个读写层。 如果删除了容器&#xff0c;也就是删除了其最上面的读写层&…...

React项目搭建流程

第一步 利用脚手架创建ts类型的react项目&#xff1a; 执行如下的命令&#xff1a;create-react-app myDemo --template typescript &#xff1b; 第二步 清理项目目录结构&#xff1a; src/ index.tsx, app.txs, react-app-env.d.ts public/index.ht…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障

关键领域软件测试的"安全密码"&#xff1a;Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力&#xff0c;从金融交易到交通管控&#xff0c;这些关乎国计民生的关键领域…...