当前位置: 首页 > news >正文

Flink消费Kafka实时写入Doris

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: logpaths:- /doc/input/*.log  # 更换为你的日志文件路径
processors:- include_fields:fields: ["message"]
output.kafka:# 更换为你的Kafka地址和主题.hosts: ["192.168.235.130:9092"]topic: k2ggcodec:format:string: '%{[message]}'

运行Filebeat采集日志

./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

   <dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>24.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency>

主程序

package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props = new Properties();//Kafka broker的地址props.put("bootstrap.servers", "192.168.235.130:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes("192.168.235.130:8030").setTableIdentifier("test.transactions").setUsername("root").setPassword("1445413748");Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println("value"+value);// 分割字符串String[] parts = value.split(",");// 创建JSON字符串StringBuilder jsonString = new StringBuilder();jsonString.append("{");jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");jsonString.append("\"user_id\":").append(parts[1]).append(",");jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");jsonString.append("\"amount\":").append(parts[3]).append(",");jsonString.append("\"currency\":\"").append(parts[4]).append("\",");jsonString.append("\"status\":\"").append(parts[5]).append("\",");jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");jsonString.append("}");return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute("flink kafka to doris by datastream");}
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。

相关文章:

Flink消费Kafka实时写入Doris

本文模拟实际生产环境&#xff0c;通过FileBeat采集日志信息到Kafka&#xff0c;再通过Flink消费Kafka实时写入Doris。 文章目录 Filebeat采集日志到KafkaFlink消费Kafka实时写入Doris总结 Filebeat采集日志到Kafka 常见的日志采集工具有以下几种&#xff1a;Flume、Logstash和…...

实现Web QQ音乐打开现有新标签页切换音乐

若没有打开播放音乐标签页&#xff0c;则打开新标签页播放所选音乐如果已打开新标签页&#xff0c;则直接切换所选音乐 pageA.vue <script setup lang"ts"> const tab2 ref<any>(null); const router useRouter();interface Track {id: number;name: …...

从底层结构开始学习FPGA(15)----时钟结构(通俗版)

目录 0、前言 1、IO Bank和Clock Region(时钟区域)是一个东西吗? 2、时钟输入管脚 3、时钟架构 3.1、全局时钟BUFG 3.2、水平时钟BUFH 3.3、IO时钟BUFIO 3.4、区域时钟BUFR/BUFMR 4、总结 《从底层结构开始学习FPGA》目录与传送门 0、前言 我思来想去,总觉…...

MacOS Sublime Text 解决中乱码

1. 安装Package Control 官方安装指南 手动安装 通过以此点击菜单 Sublime Text > Preferences > Browse Packages 打开Packages目录找到Packages的同级目录Installed Packages下载PackageControl.sublime-package并保存到Installed Packages中在菜单 Sublime Text &g…...

Python画笔案例-084 绘制 3D立方体

1、绘制 3D立方体 通过 python 的turtle 库绘制 3D立方体,如下图: 2、实现代码 绘制 3D立方体,以下为实现代码: import turtle import timeviewfactor = 150 xshift = 0 yshift = 0 zshift = 50...

“八股文”面试:助力、阻力还是空谈?

在当今的IT行业&#xff0c;面试程序员时提及“八股文”已成为一种普遍现象。所谓“八股文”&#xff0c;通常指的是一系列固定的、标准化的面试问题及其解答&#xff0c;这些问题往往涵盖了计算机科学和软件工程的基础知识&#xff0c;以及一些流行的技术框架和算法。然而&…...

如何实现弹出式窗口

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了Sliver综合示例相关的内容,本章回中将介绍PopupMenuButton组件.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在本章回中介绍的PopupMenuButton组件位于AppBar右侧,通常显示三个圆点图标,点击该图标…...

Lua 函数

Lua 函数 Lua 是一种轻量级的编程语言&#xff0c;广泛用于游戏开发、脚本编写和其他应用程序中。在 Lua 中&#xff0c;函数是一等公民&#xff0c;这意味着它们可以被存储在变量中&#xff0c;作为参数传递给其他函数&#xff0c;以及作为其他函数的返回值。本文将详细介绍 …...

HTML_文本标签

概念&#xff1a; 1、用于包裹&#xff1a;词汇、短语等。 2、通常写在排版标签里面。 3、排版标签更宏观(大段的文字)&#xff0c;文本标签更微观(词汇、短语)。 4、文本标签通常都是行内元素。 常用的文本标签 标签名 全称 标签语义em Emphasized 加重(文本)。要着重阅…...

基于SpringBoot+Vue+uniapp的诗词学习系统的详细设计和实现(源码+lw+部署文档+讲解等)

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…...

健康睡眠的重要性

在快节奏的现代生活中&#xff0c;健康养生已成为人们日益关注的话题&#xff0c;而睡眠&#xff0c;这一看似平凡却至关重要的生理需求&#xff0c;往往被忽视在忙碌的缝隙中。今天&#xff0c;让我们深入探讨健康养生中的睡眠艺术&#xff0c;它不仅关乎身体的休息与恢复&…...

知道ip地址怎么看网络地址

在计算机网络的世界里&#xff0c;IP地址是设备之间通信的基础。然而&#xff0c;仅仅知道一个设备的IP地址并不足以完全理解它在网络中的位置和作用。网络地址&#xff0c;作为IP地址的一个重要组成部分&#xff0c;为我们提供了关于设备所属网络的更多信息。本文将深入探讨如…...

精心整理85道Java微服务面试题(含答案)

微服务 面试题 1、您对微服务有何了解&#xff1f; 2、微服务架构有哪些优势&#xff1f; 3。微服务有哪些特点&#xff1f; 4、设计微服务的最佳实践是什么&#xff1f; 5、微服务架构如何运作&#xff1f; 6、微服务架构的优缺点是什么&#xff1f; 7、单片&#xff0…...

MongoDB聚合管道(Aggregation Pipeline)

聚合管道&#xff08;Aggregation Pipeline&#xff09;是MongoDB中用于对数据进行处理和分析的一种强大机制。它由一系列的阶段&#xff08;Stage&#xff09;组成&#xff0c;每个阶段对输入的数据进行一种特定的操作&#xff0c;然后将结果传递给下一个阶段&#xff0c;就像…...

移情别恋c++ ദ്ദി˶ー̀֊ー́ ) ——6.vector(无习题)

C 中的 vector 容器详细总结 1. 什么是 vector&#xff1f; vector 是 C 标准模板库 (STL) 中的一种动态数组容器。它的底层实现是一个可以自动扩展的数组&#xff0c;支持随机访问和动态调整大小&#xff0c;是 C 中最常用的序列容器之一。vector 在插入、删除、遍历以及随机…...

SpringBoot技术支持的桂林景点导航

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…...

利用vmware在移动硬盘安装Ubuntu2go

安装 买个移动硬盘&#xff0c;usb插电脑&#xff0c;磁盘管理看磁盘序列号 vmware新建虚拟机 这一步选择磁盘管理里面看到的磁盘4 先不要开机&#xff0c;选择设置里面UEFI 和安装正常Ubuntu一致操作即可&#xff0c;这里可以不选高级&#xff0c;默认一个引导分区&…...

Spring Boot:中小型医院网站的敏捷开发

摘 要 本基于Spring Boot的中小型医院网站设计目标是实现用户网络预约挂号的功能&#xff0c;同时提高医院管理效率&#xff0c;更好的为广大用户服务。 本文重点阐述了中小型医院网站的开发过程&#xff0c;以实际运用为开发背景&#xff0c;基于Spring Boot框架&#xff0c;运…...

241011-在jupyter中实现文件夹压缩后下载

241011-在jupyter中实现文件夹压缩后下载 在使用jupyter notebook过程中&#xff0c;我们经常会遇到成堆的文件无法批量下载的问题&#xff0c;这里提供压缩文件夹代码&#xff0c;压缩后即可右键文件选择download实现批量下载 import zipfile import os# 设置你想要压缩的文…...

.NET 一款用于转储指定进程内存的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...