当前位置: 首页 > news >正文

Flink消费Kafka实时写入Doris

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: logpaths:- /doc/input/*.log  # 更换为你的日志文件路径
processors:- include_fields:fields: ["message"]
output.kafka:# 更换为你的Kafka地址和主题.hosts: ["192.168.235.130:9092"]topic: k2ggcodec:format:string: '%{[message]}'

运行Filebeat采集日志

./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

   <dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>24.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency>

主程序

package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props = new Properties();//Kafka broker的地址props.put("bootstrap.servers", "192.168.235.130:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes("192.168.235.130:8030").setTableIdentifier("test.transactions").setUsername("root").setPassword("1445413748");Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println("value"+value);// 分割字符串String[] parts = value.split(",");// 创建JSON字符串StringBuilder jsonString = new StringBuilder();jsonString.append("{");jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");jsonString.append("\"user_id\":").append(parts[1]).append(",");jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");jsonString.append("\"amount\":").append(parts[3]).append(",");jsonString.append("\"currency\":\"").append(parts[4]).append("\",");jsonString.append("\"status\":\"").append(parts[5]).append("\",");jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");jsonString.append("}");return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute("flink kafka to doris by datastream");}
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。

相关文章:

Flink消费Kafka实时写入Doris

本文模拟实际生产环境&#xff0c;通过FileBeat采集日志信息到Kafka&#xff0c;再通过Flink消费Kafka实时写入Doris。 文章目录 Filebeat采集日志到KafkaFlink消费Kafka实时写入Doris总结 Filebeat采集日志到Kafka 常见的日志采集工具有以下几种&#xff1a;Flume、Logstash和…...

实现Web QQ音乐打开现有新标签页切换音乐

若没有打开播放音乐标签页&#xff0c;则打开新标签页播放所选音乐如果已打开新标签页&#xff0c;则直接切换所选音乐 pageA.vue <script setup lang"ts"> const tab2 ref<any>(null); const router useRouter();interface Track {id: number;name: …...

从底层结构开始学习FPGA(15)----时钟结构(通俗版)

目录 0、前言 1、IO Bank和Clock Region(时钟区域)是一个东西吗? 2、时钟输入管脚 3、时钟架构 3.1、全局时钟BUFG 3.2、水平时钟BUFH 3.3、IO时钟BUFIO 3.4、区域时钟BUFR/BUFMR 4、总结 《从底层结构开始学习FPGA》目录与传送门 0、前言 我思来想去,总觉…...

MacOS Sublime Text 解决中乱码

1. 安装Package Control 官方安装指南 手动安装 通过以此点击菜单 Sublime Text > Preferences > Browse Packages 打开Packages目录找到Packages的同级目录Installed Packages下载PackageControl.sublime-package并保存到Installed Packages中在菜单 Sublime Text &g…...

Python画笔案例-084 绘制 3D立方体

1、绘制 3D立方体 通过 python 的turtle 库绘制 3D立方体,如下图: 2、实现代码 绘制 3D立方体,以下为实现代码: import turtle import timeviewfactor = 150 xshift = 0 yshift = 0 zshift = 50...

“八股文”面试:助力、阻力还是空谈?

在当今的IT行业&#xff0c;面试程序员时提及“八股文”已成为一种普遍现象。所谓“八股文”&#xff0c;通常指的是一系列固定的、标准化的面试问题及其解答&#xff0c;这些问题往往涵盖了计算机科学和软件工程的基础知识&#xff0c;以及一些流行的技术框架和算法。然而&…...

如何实现弹出式窗口

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了Sliver综合示例相关的内容,本章回中将介绍PopupMenuButton组件.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在本章回中介绍的PopupMenuButton组件位于AppBar右侧,通常显示三个圆点图标,点击该图标…...

Lua 函数

Lua 函数 Lua 是一种轻量级的编程语言&#xff0c;广泛用于游戏开发、脚本编写和其他应用程序中。在 Lua 中&#xff0c;函数是一等公民&#xff0c;这意味着它们可以被存储在变量中&#xff0c;作为参数传递给其他函数&#xff0c;以及作为其他函数的返回值。本文将详细介绍 …...

HTML_文本标签

概念&#xff1a; 1、用于包裹&#xff1a;词汇、短语等。 2、通常写在排版标签里面。 3、排版标签更宏观(大段的文字)&#xff0c;文本标签更微观(词汇、短语)。 4、文本标签通常都是行内元素。 常用的文本标签 标签名 全称 标签语义em Emphasized 加重(文本)。要着重阅…...

基于SpringBoot+Vue+uniapp的诗词学习系统的详细设计和实现(源码+lw+部署文档+讲解等)

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…...

健康睡眠的重要性

在快节奏的现代生活中&#xff0c;健康养生已成为人们日益关注的话题&#xff0c;而睡眠&#xff0c;这一看似平凡却至关重要的生理需求&#xff0c;往往被忽视在忙碌的缝隙中。今天&#xff0c;让我们深入探讨健康养生中的睡眠艺术&#xff0c;它不仅关乎身体的休息与恢复&…...

知道ip地址怎么看网络地址

在计算机网络的世界里&#xff0c;IP地址是设备之间通信的基础。然而&#xff0c;仅仅知道一个设备的IP地址并不足以完全理解它在网络中的位置和作用。网络地址&#xff0c;作为IP地址的一个重要组成部分&#xff0c;为我们提供了关于设备所属网络的更多信息。本文将深入探讨如…...

精心整理85道Java微服务面试题(含答案)

微服务 面试题 1、您对微服务有何了解&#xff1f; 2、微服务架构有哪些优势&#xff1f; 3。微服务有哪些特点&#xff1f; 4、设计微服务的最佳实践是什么&#xff1f; 5、微服务架构如何运作&#xff1f; 6、微服务架构的优缺点是什么&#xff1f; 7、单片&#xff0…...

MongoDB聚合管道(Aggregation Pipeline)

聚合管道&#xff08;Aggregation Pipeline&#xff09;是MongoDB中用于对数据进行处理和分析的一种强大机制。它由一系列的阶段&#xff08;Stage&#xff09;组成&#xff0c;每个阶段对输入的数据进行一种特定的操作&#xff0c;然后将结果传递给下一个阶段&#xff0c;就像…...

移情别恋c++ ദ്ദി˶ー̀֊ー́ ) ——6.vector(无习题)

C 中的 vector 容器详细总结 1. 什么是 vector&#xff1f; vector 是 C 标准模板库 (STL) 中的一种动态数组容器。它的底层实现是一个可以自动扩展的数组&#xff0c;支持随机访问和动态调整大小&#xff0c;是 C 中最常用的序列容器之一。vector 在插入、删除、遍历以及随机…...

SpringBoot技术支持的桂林景点导航

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…...

利用vmware在移动硬盘安装Ubuntu2go

安装 买个移动硬盘&#xff0c;usb插电脑&#xff0c;磁盘管理看磁盘序列号 vmware新建虚拟机 这一步选择磁盘管理里面看到的磁盘4 先不要开机&#xff0c;选择设置里面UEFI 和安装正常Ubuntu一致操作即可&#xff0c;这里可以不选高级&#xff0c;默认一个引导分区&…...

Spring Boot:中小型医院网站的敏捷开发

摘 要 本基于Spring Boot的中小型医院网站设计目标是实现用户网络预约挂号的功能&#xff0c;同时提高医院管理效率&#xff0c;更好的为广大用户服务。 本文重点阐述了中小型医院网站的开发过程&#xff0c;以实际运用为开发背景&#xff0c;基于Spring Boot框架&#xff0c;运…...

241011-在jupyter中实现文件夹压缩后下载

241011-在jupyter中实现文件夹压缩后下载 在使用jupyter notebook过程中&#xff0c;我们经常会遇到成堆的文件无法批量下载的问题&#xff0c;这里提供压缩文件夹代码&#xff0c;压缩后即可右键文件选择download实现批量下载 import zipfile import os# 设置你想要压缩的文…...

.NET 一款用于转储指定进程内存的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…...

Splunk 修补关键漏洞,包括远程代码执行漏洞

领先的数据分析和安全监控平台 Splunk 发布了一系列安全更新&#xff0c;以解决 Splunk Enterprise 和 Splunk Cloud Platform 中的多个漏洞。这些漏洞的严重程度不一&#xff0c;有些可实现远程代码执行&#xff08;RCE&#xff09;&#xff0c;有些则允许低权限用户访问敏感信…...

搭建一个vue3+vite框架

可以使用以下两种搭建方式 通过create-vue搭建vue3 项目&#xff08;建议使用&#xff09; create-vue create-vue 是 Vue.js 官方推荐的用于快速启动 Vite 驱动的 Vue 项目的脚手架工具。它简化了创建新 Vue 项目的过程&#xff0c;提供了预配置的项目结构&#xff0c;并集…...

【含文档】基于Springboot+Vue的公交管理系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…...

自闭症儿童能否适应学校生活:提供专业辅助,助力顺利融入

自闭症&#xff0c;这一复杂的神经发育障碍&#xff0c;往往让许多家庭在孩子的教育问题上倍感焦虑。面对即将步入学校生活的自闭症儿童&#xff0c;家长们不禁要问&#xff1a;他们能否适应学校生活&#xff1f;如何帮助他们顺利融入&#xff1f;幸运的是&#xff0c;随着医疗…...

MQTTnet.Server同时支持mqtt及websocket协议

Net6后写法 Net6前写法 Program.cs using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using MQTTnet.AspNetCore; using System; using System.IO;namespace MQTTnet.Server {public class Program{publ…...

【数据结构】二叉树(一)遍历

导言 前面以及有了堆的基础&#xff0c;现在来学习二叉树。二叉树的学习和前面的数据结构很不一样&#xff0c;前面我们主要学习用数据结构储存数据&#xff0c;以及实际手搓数据结构的增删查改&#xff1b;而学习二叉树主要是为我们以后学搜索二叉树以及后面的AVL树等数据结构…...

【C++ 贪心】1616. 分割两个字符串得到回文串|1868

本文涉及知识点 C贪心 LeetCode1616. 分割两个字符串得到回文串 给你两个字符串 a 和 b &#xff0c;它们长度相同。请你选择一个下标&#xff0c;将两个字符串都在 相同的下标 分割开。由 a 可以得到两个字符串&#xff1a; aprefix 和 asuffix &#xff0c;满足 a aprefi…...

识别秒拨风险的具体方法及策略

秒拨技术是利用家用宽带拨号上网&#xff08;PPPoE&#xff09;的特性&#xff0c;通过频繁断线重连来获取新的IP地址&#xff0c;从而构建代理服务或进行其他网络活动。这种技术使得IP地址的切换频率极高&#xff0c;加大了识别和追踪的难度。因此&#xff0c;首先需要对秒拨技…...

[Python]如何在Ubuntu中建置python venv虛擬環境,並安裝TensorFlow和OpenCV函式庫?

為了在樹莓派上實現物件影像辨識功能&#xff0c;同時不影響樹莓派原來的python運行環境&#xff0c;選擇建置python虛擬環境[Note1]是一個好方式&#xff0c;其可避免版本衝突和不同運行環境的問題。另外&#xff0c;一併在該虛擬環境中安裝TensorFlow[Note2]和OpenCV[Note3]等…...

Excel:Cells(Rows.Count, 1).End(xlUp).Row和Cells(Rows.Count, 1).End(xlUp)有什么区别

Cells(Rows.Count, 1).End(xlUp).Row 和 Cells(Rows.Count, 1).End(xlUp) 是 VBA 中用于定位 Excel 工作表中单元格的两种不同用法。以下是它们的区别&#xff1a; 1. Cells(Rows.Count, 1).End(xlUp).Row 功能: 这个表达式返回的是一个行号&#xff08;Long 类型&#xff09…...