当前位置: 首页 > news >正文

大数据学习之分布式数据采集系统Flume学习

分布式数据采集系统Flume学习

一、Flume架构

1.1 Hadoop业务开发流程

hadoop业务处理流程

1.2 Flume概述

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。

支持在日志系统中定制各类数据发送方,用于收集数据;

同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由**事件(Event)**贯穿始终。

事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

Event的概念:

flume的核心是把数据从数据源(source)收集过来,在将收集到的数据由目的地(sink)所拉取。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

简单理解:event信息就是flume收集到的数据

image-20220615225417865

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。

它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。

通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

image-20220615225229222

flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

1.2.1 Source

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource
如果内置的Source无法满足需要, Flume还支持自定义Source

image-20220615225948482

Source支持的类型

image-20220615230032981

1.2.2 Channel

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘(或支持jdbc的数据库中)上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel。

Channel支持的类型

image-20220615230122092

1.2.3 Sink

Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。

image-20220615230204194

1.3 Flume运行机制

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。

值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。

比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,

也就是说,多个agent可以协同工作。

1.4 Flume可靠性

Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。

1.5 flume的广义用法(多个agent顺序连接)

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的
Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

image-20220615230439884

二、Flume的安装(解压即安装)

1、上传至虚拟机,并解压

tar -zxvf apache-flume-1.11.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft

alias soft=‘cd /usr/local/soft/’

2、重命名目录,并配置环境变量

mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile

3、查看flume版本

flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 

三、使用案例

在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。

案例一、从控制台打入数据,在控制台显示

1、确定scource类型,channel类型和sink类型

确定的使用类型分别是,netcat source, memory channel, logger sink.

2、编写conf文件(文件中内容书写的顺序不做要求)

#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1#定义source类型,这里是使用netcat的类型:监控流经一个端口的数据(此时需设置一个端口号),将每一个文本行数据作为EVENT的输入
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.128.100
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1#定义channel:将数据存储到内存中
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger
#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.128.100
a1.sources.r1.port = 12345a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks.k1.type = logger# 组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、开启服务,我们重新开启复制一个客户端进行开启服务

命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径

-- Dflume.root.logger=DEBUG,console	:指定日志
[root@master scrips]# flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console	

4、在另一个客户端输入命令:

注意:这里的master和8888是在conf文件中设置好的ip地址和端口

在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。

yum install -y telnet
telnet master 12345
退出:ctrl+],然后再按 qnetstat -tunlp |grep 端口号netstat -tunlp |grep 12345监控日志文件,查看输入结果
日志文件在脚本编写运行的目录下

在这里插入图片描述

image-20220615233156185

案例二、从本地指定路径中打入数据到HDFS

1、同样,我们需要先确定scource类型,channel类型和sink类型

我们确定使用的类型分别是,spooldir source(监控指定目录内的数据变更), memory channel, hdfs sink

2、编写conf文件

create external table students_flume
(id bigint,name string,age int,gender string,clazz string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';LOCATION '/bigdata30/flumeout2/log_s'; // 必选,指定列分隔符 (已做修改)
a1.sources = r1
a1.channels = c1
a1.sinks = k1#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata1
#时间拦截器 : 获取数据到达event的时间戳,将其放入event中。在最后给文件命名时会加上时间
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata30/flumeout2/log_s2
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1传入的文件为csv文件,若是text???
执行脚本:
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./sqoopldir2hdfs.conf -Dflume.root.logger=DEBUG,console
a1.sources = r1
a1.channels = c1
a1.sinks = k1#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata30/flumedata2
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000#指定sink的类型
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://192.168.128.100:9083
a1.sinks.k1.hive.database = bigdata30_test
a1.sinks.k1.hive.table = students_flume
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ","
a1.sinks.k1.serializer.serdeSeparator = ','
a1.sinks.k1.serializer.fieldnames =id,name,age,gender,clazz#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、开启服务

[root@master scrips]# flume-ng agent -n a1 -c ../../flume/conf -f ./linux2hive.conf -Dflume.root.logger=DEBUG, console执行出错:
一直卡在下述界面
原因:新版本0各种包错误,不要轻易尝试新版本。可以使用先将数据传到hdfs中,再再hive中创建表指定这个hdfs目录,将数据映射到hive中

类似案例

create external table bigdata30_test.students_flume_test
(id bigint,name string,num int,num1 int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
location '/bigdata30/teachers';

4、将文件复制到指定的目录下

cp DIANXIN.csv /usr/local/soft/flumedata/
cp students.csv ./flumedata1 

image-20220615234801009

课堂穿插案例:手动打数据到hive表(无法执行)

a1.sources = r1
a1.sinks = k1
a1.channels = c1#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/flumedata4
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hive/warehouse/bigdata30.db/students_flume
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = students_test
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 1000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#每次从channel中取出的条数
a1.sinks.k1.hdfs.batchSize=1000 #指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/flumedata3
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestampa1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://192.168.128.100:9083
a1.sinks.k1.hive.database = bigdata30
a1.sinks.k1.hive.table = students_flume
a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ","
a1.sinks.k1.serializer.serdeSeparator = ','
a1.sinks.k1.serializer.fieldnames =id,name,age,gender,clazz#指定channel
a1.channels.c1.type = memory#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

案例三、从java代码中进行捕获打入到HDFS

1、先确定scource类型,channel类型和sink类型

确定的三个组件的类型是,avro source, memory channel, hdfs sink

2、打开maven项目,添加依赖

            <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.9.0</version></dependency>

3、设置log4J的内容

log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.230.50
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout 
log4j.appender.flume.layout.ConversionPattern=%m%n

编写java代码(示例,可以修改logger打印的内容)

package com.shujia.log2flume;import org.apache.log4j.Logger;import java.text.SimpleDateFormat;
import java.util.Date;public class LoggerToFlume {public static void main(String[] args) throws InterruptedException {//创建一个logger对象Logger logger = Logger.getLogger(LoggerToFlume.class.getName());//创建一个日期格式化对象SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//写一个死循环while (true) {Date date = new Date();logger.info("dateToBigdata17: " + sdf.format(date));//让线程休眠一会儿Thread.sleep(1000);}}
}

4、编写conf文件

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.128.100
a.sources.r1.port = 12345#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 10#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/bigdata30/flumeout3/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 10
a.sinks.k1.hdfs.batchSize = 10
a.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0 #组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

5、开启服务,命令:

flume-ng agent -n a -c ../conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console

6、运行Java代码

image-20220616000313292

7、查看HDFS

image-20220616000329598

image-20220616000336652

案例四、监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)

1、监控日志

提前建好表

 create 'log' , 'cf1'

编写conf文件 hbaselog2hdfs.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1#指定spooldir的属性
a.sources.r1.type = exec 
a.sources.r1.command = tail -F /usr/local/soft/bigdata30/work_day.txt#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100#指定sink的类型
#a.sinks.k1.type = hbase
#a.sinks.k1.table = log
#a.sinks.k1.columnFamily = cf1a.sinks.k1.type = hbase2
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

运行

flume-ng agent -n a -c ../conf -f ./ hbaselog2hdfs.conf -Dflume.root.logger=DEBUG,console
2、监控自定义的文件

确保test_idoall_org表在hbase中已经存在:

hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds

2.创建配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/flumedata/data.txt# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动flume agent:

flume-ng agent -n a1 -c ../../flume/conf -f ./file2hbase.conf -Dflume.root.logger=DEBUG, console

4.产生数据:

echo "hello idoall.org from flume" >> data.txt

案例五、flume监控Http source

1、先确定scource类型,channel类型和sink类型

确定的三个组件的类型是,http source, memory channel, logger sink.

2、编写conf文件

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100

3、启动服务

flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console

4、复制一个窗口进行打数据

curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]'  http://192.168.128.100:50000

image-20220616003631869

案例六、多路复制

image-20220616221022949

flume多路复制案例

1、将flume复制到node1,node2

[root@master soft]# scp -r flume-1.9.0 node1:`pwd`
[root@master soft]# scp -r flume-1.9.0 node2:`pwd`

2、在node1节点的/usr/local/soft/bigdata17/scripts 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = loggera1.sources.r1.channels = c1
a1.sinks.k1.channel = c1flume-ng agent -n a1 -c /usr/local/soft/flume-1.11.0/conf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console

3、在node2节点的 /usr/local/soft/bigdata17/scripts 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容:

a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 4141a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4

4、在master节点的 /usr/local/soft/bigdata17/scrips 下新建配置文件:

vim netcat-flume-loggers.conf

添加如下内容

a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = master
a2.sources.r1.port = 44444# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 4141a2.sinks.k2.type = avro
a2.sinks.k2.hostname = node2
a2.sinks.k2.port = 4141# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

三台服务器的配置文件建好了,现在就可以启动flume集群了:

先启动node1和node2节点的logger服务端:

flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a4 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console

启动master节点的netcat:

flume-ng agent -n a2 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console

开启netcat后此窗口就不能操作了,再新建一个master窗口启动telnet:

telnet master 44444

master上输入数据:

image-20220616223124670

node1和node2接收数据:

image-20220616223150948

image-20220616223202087

案例七、故障转移

image-20220616223934024

故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障

1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作

2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink

3)遇到故障时,我们要立即修复

master:

vim guzhang.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

node1

a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 5555a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3

node2

a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 6666a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4

先启动node1,node2上的

flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a4 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console

再启动master的

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./guzhang.conf -Dflume.root.logger=INFO,console

master输入数据

telnet master 4444

image-20220616224619686

数据会打到node2

image-20220616224629191

将node2手动关闭,再输入数据,这时候数据打到node1

image-20220616224725045

image-20220616224733386

再将node2启动起来,再输入数据,这时候,node2继续接收

image-20220616224848627

image-20220616224859467

案例八、负载均衡

通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)

使用负载均衡以后,channel会轮训分配任务,减少机器负荷

master上的配置文件:(随机的)

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = randoma1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

案例九、聚合

node1、node2两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求:

把node1、node2机器中的access.log、nginx.log、web.log 采集汇总到master机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为:

/shujia/bigdata17/flumelogs/access/20220616/** 
/shujia/bigdata17/flumelogs/nginx/20180616/** 
/shujia/bigdata17/flumelogs/web/20180616/**

场景分析:

聚合案例流程

数据流程处理分析:

聚合flume详细流程

实现:

node1对应的IP为 192.168.40.120
node2对应的IP为 192.168.40.130
master对应的IP为 192.168.40.110

node1和node2上配置文件

[root@node2 bigdata17]# mkdir -p /usr/local/soft/bigdata30/scrips/taillogs[root@node2 bigdata17]# touch /usr/local/soft/bigdata30/scrips/taillogs/access.log
[root@node2 bigdata17]# touch /usr/local/soft/bigdata30/scrips/taillogs/nginx.log
[root@node2 bigdata17]# touch /usr/local/soft/bigdata30/scrips/taillogs/web.log
vim exec_source_avro_sink.conf
# Name the components on this agent 
a1.sources = r1 r2 r3 
a1.channels = c1 
a1.sinks = k1 # Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/access.log 
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = type 
a1.sources.r1.interceptors.i1.value = access a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/nginx.log 
a1.sources.r2.interceptors = i2 
a1.sources.r2.interceptors.i2.type = static 
a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /usr/local/soft/bigdata30/scrips/taillogs/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static 
a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web # Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = master 
a1.sinks.k1.port = 41414 # Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1 
a1.sources.r3.channels = c1 
a1.sinks.k1.channel = c1

在master上面开发flume配置文件

vim avro_source_hdfs_sink.conf
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 # 定义source 
a1.sources.r1.type = avro 
a1.sources.r1.bind = master 
a1.sources.r1.port =41414 
# 添加时间拦截器 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = timestamp# 定义channels 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 # 定义sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://master:9000/bigdata30/flumelogs/%{type}/%Y%m%d 
a1.sinks.k1.hdfs.filePrefix = events 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
# 时间类型 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件不按条数生成 
a1.sinks.k1.hdfs.rollCount = 0 
# 生成的文件按时间生成 
a1.sinks.k1.hdfs.rollInterval = 30 
# 生成的文件按大小生成 
a1.sinks.k1.hdfs.rollSize = 10485760 
# 批量写入hdfs的个数 
a1.sinks.k1.hdfs.batchSize = 10000 
# flume操作hdfs的线程数(包括新建,写入等) 
a1.sinks.k1.hdfs.threadsPoolSize=10 
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000 # 组装source、channel、sink 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

采集端文件生成脚本
在node1与node2上面开发shell脚本,模拟数据生成 server.sh

# !/bin/bash while true dodate >> /usr/local/soft/bigdata30/scrips/taillogs/access.log; date >> /usr/local/soft/bigdata30/scrips/taillogs/web.log; date >> /usr/local/soft/bigdata30/scrips/taillogs/nginx.log; sleep 0.5; 
done

顺序启动服务
master启动flume实现数据收集

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./avro_source_hdfs_sink.conf -Dflume.root.logger=INFO,console

node1与node2启动flume实现数据监控

 flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./exec_source_avro_sink.conf -Dflume.root.logger=INFO,console

node1与node2启动生成文件脚本

sh server.sh

案例十、ChannelSelector案例

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating

  1. Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
  2. 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用

replicating类型例子:

a1.sources = r1
a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

multiplexing类型的ChannelSelector例子:

a1.sources = r1
a1.channels = c1 c2a1.sources.source1.selector.type = multiplexing
a1.sources.source1.selector.header = title # 以header中的title对应的值作为条件
a1.sources.source1.selector.mapping.a = c2 # 如果header中title的值为a,使用c2这个channel
a1.sources.source1.selector.mapping.b = c1 # 如果header中title的值为b,使用c1这个channel
a1.sources.source1.selector.default = c1 # 默认使用c1这个channel
SinkProcessor

SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

自定义Interceptor

使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

需求:

在该案例中,我们以端口数据模拟日志,模拟不同类型的日志,我们需要自定义interceptor区分内容是否包含shujia,将其分别发往不同的分析系统(Channel)。

实现代码

package com.shujia.log2flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;
import java.util.Map;/***  1. 如何自定义拦截器?*   flume的自定义拦截器需要实现Flume提供的Interceptor接口.**  实现抽象方法:*      initialize: 完成一些初始化工作.*      close: 完成一些善后的工作*      intercept:拦截器的核心处理方法.  拦截的逻辑.*          intercept(Event event) : 单个event的拦截处理*          intercept(List<Event> events): 批次event的拦截处理**  2. 拦截器的对象如何实例化?*    在拦截器中定义一个static的内部类,实现Flume提供的Builder接口**   实现抽象方法:*      build : 用于构建拦截器对象*      configure:用于读取配置信息(xxxx.conf)****/
public class LogDataInterceptor implements Interceptor {@Overridepublic void initialize() {}/*判断变成event的数据中是否包含shujia字符串,event==>header({}) + body(数据)如果包含,给event中的header中添加一个key-value: name/title/key ===  sj如果不包含,给event中的header中添加一个key-value: name/title/key ===  nsj*/@Overridepublic Event intercept(Event event) {//如何取出event中的header和body呢?//    Map<String, String> getHeaders();//    void setHeaders(Map<String, String> var1);//    byte[] getBody();Map<String, String> headers = event.getHeaders();String body = new String(event.getBody());//判断body是否包含shujiaif(body.contains("shujia")){headers.put("title","sj");}else {headers.put("title","nsj");}return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class MyBuilder implements Builder{@Overridepublic Interceptor build() {return new LogDataInterceptor();}@Overridepublic void configure(Context context) {}}
}

引入依赖

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

将代码打成jar包

将jar包放在flume的lib目录下。简单暴力,但是不方便管理

配置文件

1.进阶案例 - channel选择器 - 多路
a3 ==> a3.confa3.sources = r1
a3.channels = c1
a3.sinks = k1 a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100a3.sinks.k1.type = loggera3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 a2 ==> a2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100a2.sinks.k1.type =loggera2.sources.r1.channels = c1
a2.sinks.k1.channel = c1a1 ==> a1.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 4444#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = type
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.sj = c1
a1.sources.r1.selector.mapping.nsj = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = interceptor.ShuJiaInterceptor$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2 a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

启动

先启动node1和node2上面的flume

flume-ng agent -n a2 -c ../../flume-1.9.0/conf -f ./a2.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./a3.conf -Dflume.root.logger=INFO,console

最后启动master上面的flume

flume-ng agent -n a1 -c ../../flume-1.9.0/conf -f ./a3.conf -Dflume.root.logger=INFO,console

相关文章:

大数据学习之分布式数据采集系统Flume学习

分布式数据采集系统Flume学习 一、Flume架构 1.1 Hadoop业务开发流程 1.2 Flume概述 flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。 支持在日志系统中定制各类数据发送方&#xff0c;用于收集数据; 同时&#xff0c;Flume提供对数据进行简单处理&…...

谁用谁夸,为什么BI零售数据分析方案这么受欢迎?

在当今数字化时代&#xff0c;零售行业竞争激烈&#xff0c;如何快速准确地获取数据洞察&#xff0c;成为企业制胜的关键。奥威BI零售数据分析方案&#xff0c;凭借其全面、高效、智能的特点&#xff0c;赢得了广大零售企业的青睐&#xff0c;成为市场上的热门选择。 奥威BI零…...

多路h265监控录放开发-(14)通过PaintCell自定义日历控件继承QCalendarWidget的XCalendar类

首先创建一个新类XCalendar继承QCalendarWidget类&#xff0c;然后在UI视图设计器中把日历提升为XCalendar&#xff0c;通过这个函数自己设置日历的样式 xcalendar.h #pragma once #include <QCalendarWidget> class XCalendar :public QCalendarWidget { public:XCal…...

安卓速度下载v1.0.5/聚合短视频解析下载

功能特色 短视频下载与高级管理 – 支持短视频下载&#xff0c;为您提供一系列高级视频管理功能包括视频内容提取、智能防重复技术、视频体积压缩以及视频转换成GIF图片等&#xff1b; 磁-力链接下载升级 – 现支持磁力链接下载&#xff0c;实现边下载边播放的便捷体验&#x…...

从赛题切入谈如何学习数学建模

1.引言 &#xff08;1&#xff09;今天学习了这个汪教授的这个视频&#xff0c;主要是对于一个赛题的介绍讲解&#xff0c;带领我们通过这个赛题知道数学建模应该学习哪些技能&#xff0c;以及这个相关的经验&#xff0c;我感觉这个还是让我自己受益匪浅的 &#xff08;2&…...

江山欧派杯2024全国华佗五禽戏线上线下观摩交流比赛在亳州开幕

6月28日&#xff0c;2024全国华佗五禽戏线上线下观摩交流比赛在安徽省亳州市开幕。 此次比赛是由安徽省亳州市文化旅游体育局和安徽省非物质文化遗产保护中心主办、亳州市华佗五禽戏协会&#xff08;国家级非遗华佗五禽戏保护单位&#xff09;和亳州市传统华佗五禽戏俱乐部&…...

怪兽充电一季度由盈转亏:营收大幅下滑,消费者投诉不断

《港湾商业观察》施子夫 日常生活中&#xff0c;关于移动充电宝“乱扣费”、“充电功率低”的吐槽之声不绝于耳。而原先风靡一时的共享充电宝也被不少网友吐槽为“充电刺客”&#xff0c;足以可见共享充电宝虽作为大众使用频率较高的移动电源产品&#xff0c;但负面评价声音同…...

激光与相机融合标定汇总:提升融合算法的精度与可靠性(附github地址)

前言 随着科技的飞速发展&#xff0c;激光技术与相机技术的融合已成为推动智能化影像发展的重要力量。这种融合不仅提高了成像的精度和效率&#xff0c;还为相关行业带来了革命性的变革。在这篇博客中&#xff0c;我们将深入探讨激光与相机融合标定的原理及其在各个领域的应用…...

市场拓展招聘:完整指南

扩大招聘业务会给你带来很多挑战&#xff0c;更不用说你已经在处理的问题了。助教专业人士每周花近13个小时为一个角色寻找候选人。此外&#xff0c;客户的需求也在不断变化&#xff0c;招聘机构之间的竞争也在加剧。毫无疑问&#xff0c;对增长有战略的方法会有很大的帮助。一…...

Leetcode 501:二叉搜索树中的众数

给你一个含重复值的二叉搜索树&#xff08;BST&#xff09;的根节点 root &#xff0c;找出并返回 BST 中的所有 众数&#xff08;即&#xff0c;出现频率最高的元素&#xff09;。 如果树中有不止一个众数&#xff0c;可以按 任意顺序 返回。 假定 BST 满足如下定义&#xf…...

esp8266 GPIO

功能综述 ESP8266 的 16 个通⽤ IO 的管脚位置和名称如下表所示。 管脚功能选择 功能选择寄存器 PERIPHS_IO_MUX_MTDI_U&#xff08;不同的 GPIO&#xff0c;该寄存器不同&#xff09; PIN_FUNC_SELECT(PERIPHS_IO_MUX_MTDI_U,FUNC_GPIO12);PERIPHS_IO_MUX_为前缀。后面的…...

ingress相关yaml文件报错且相关资源一切正常解决方法

今天在执行ingress相关文件的时候莫名其妙报错了&#xff0c;问了别人得知了这个方法 执行ingress相关文件报错 01.yaml是我自己创建关于ingress的yaml文件 报错信息 且相关资源一切正常 解决方法 kubectl get validatingwebhookconfigurations删除ingress-nginx-admissio…...

重要通知:据最新TEMU要求所有欧区车灯都需要能效标签(eu energy lable)

重要通知&#xff1a; 据最新TEMU要求&#xff0c;所有“欧区车灯”都需要能效标签&#xff08;eu energy lable&#xff09;&#xff0c;目前已下架欧区站点&#xff0c;上传成功后可恢复。 灯具类欧盟EU ENERGY LABEL 近日有不少欧洲站卖家收到TEMU平台商品要求卖家们发布的…...

JAVA SDK 整合 AI 大语言模型

目前主流模型厂商的 SDK 并没有很好的支持 JAVA 环境&#xff0c;主流还是使用的 Python &#xff0c;如果希望将 AI 功能集成到业务中来&#xff0c;则需要找找有没有一些现成的开源项目&#xff0c;但是这种项目一般需要谨慎使用&#xff0c;以防有偷取 app_key 等风险问题 前…...

【Apache Doris】如何实现高并发点查?(原理+实践全析)

【Apache Doris】如何实现高并发点查&#xff1f;&#xff08;原理实践全析&#xff09; 一、背景说明二、原理介绍三、环境信息四、Jmeter初始化五、参数预调六、用例准备七、高并发实测八、影响因素九、总结 本文主要分享 Apache Doris 是如何实现高并发点查的&#xff0c;以…...

解决SpringMVC使用MyBatis-Plus自定义MyBaits拦截器不生效的问题

自定义MyBatis拦截器 如果是SpringBoot项目引入Component注解就生效了&#xff0c;但是SpringMVC不行 import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.parameter.ParameterHandler; import org.apache.ibatis.executor.statement.StatementHandler; i…...

Swagger与RESTful API

1. Swagger简介 在现代软件开发中&#xff0c;RESTful API已成为应用程序间通信的一个标准。这种架构风格通过使用标准的HTTP方法来执行网络上的操作&#xff0c;简化了不同系统之间的交互。API&#xff08;应用程序编程接口&#xff09;允许不同的软件系统以一种预定义的方式…...

MySQL84 -- ERROR 1524 (HY000): Plugin ‘msql_native_password‘ is not loaded.

【问题描述】 MySQL 8.4版本&#xff0c;配置用户使用mysql_native_password认证插件验证用户身份&#xff0c;报错&#xff1a; 【解决方法】&#xff08;Windows, MySQL 8.4) 1、修改MySQL配置文件my.ini&#xff0c;在[mysqld]段添加mysql_native_passwordON。 2、管理员…...

将Excel中的错误值#N/A替换成心仪的字符串,瞬间爱了……

常用表格的人都晓得&#xff0c;看到满屏悦动的#N/A&#xff0c;心情都会不好。把它替换成自己心仪的字符&#xff0c;瞬间就爱了。 (笔记模板由python脚本于2024年06月13日 19:32:37创建&#xff0c;本篇笔记适合常用Excel&#xff0c;喜欢数据的coder翻阅) 【学习的细节是欢悦…...

AI大模型日报#0628:谷歌开源9B 27B版Gemma2、AI首次实时生成视频、讯飞星火4.0发布

导读&#xff1a;AI大模型日报&#xff0c;爬虫LLM自动生成&#xff0c;一文览尽每日AI大模型要点资讯&#xff01;目前采用“文心一言”&#xff08;ERNIE-4.0-8K-latest&#xff09;生成了今日要点以及每条资讯的摘要。欢迎阅读&#xff01;《AI大模型日报》今日要点&#xf…...

【随笔】提高代码学习水平(以更高的视角看事物)

最近&#xff0c;我感觉到自己的代码水平似乎卡在了一个瓶颈。似乎只想着数仓&#xff0c;Hive&#xff0c;Spark技术优化&#xff0c;但只要稍微离开这几个点&#xff0c;我就感到无所适从。我开始反思&#xff0c;或许&#xff0c;我应该总结一下自己的学习方法。 1.站的高&…...

游戏AI的创造思路-技术基础-深度学习(5)

继续深度学习技术的探讨&#xff0c;填坑不断&#xff0c;头秃不断~~~~~ 目录 3.5. 自编码器&#xff08;AE&#xff09; 3.5.1. 定义 3.5.2. 形成过程 3.5.3. 运行原理 3.5.3.1.运行原理及基本框架 3.5.3.2. 示例代码 3.5.4. 优缺点 3.5.5. 存在的问题和解决方法 3.5…...

基于SpringBoot养老院管理系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f;感兴趣的可以先收藏起来&#xff0c;还…...

餐饮点餐的简单MySQL集合

ER图 模型图&#xff08;没有进行排序&#xff0c;混乱&#xff09; DDL和DML /* Navicat MySQL Data TransferSource Server : Mylink Source Server Version : 50726 Source Host : localhost:3306 Source Database : schooldbTarget Server Type …...

STM32驱动-ads1112

汇总一系列AD/DA的驱动程序 ads1112.c #include "ads1112.h" #include "common.h"void AD5726_Init(void) {GPIO_InitTypeDef GPIO_InitStructure;RCC_APB2PeriphClockCmd( RCC_APB2Periph_GPIOA | RCC_APB2Periph_GPIOC, ENABLE );//PORTA、D时钟使能 G…...

数据结构与算法高频面试题

初级面试题及详细解答 当涉及到数据结构与算法的初级面试题时&#xff0c;通常涉及基本的数据结构操作、算法复杂度分析和基本算法的应用。 1. 什么是数组&#xff1f;数组和链表有什么区别&#xff1f; 解答&#xff1a; 数组&#xff1a;是一种线性数据结构&#xff0c;用…...

uni-app的showModal提示框,进行删除的二次确认,可自定义确定或取消操作

实现效果&#xff1a; 此处为删除的二次确认示例&#xff0c;点击删除按钮时出现该提示&#xff0c;该提示写在js script中。 实现方式&#xff1a; 通过uni.showModal进行提示&#xff0c;success为确认状态下的操作自定义&#xff0c;此处调用后端接口进行了删除操作&#…...

5款提高工作效率的免费工具推荐

SimpleTex SimpleTex是一款用于创建和编辑LaTeX公式的简单工具。它能够识别图片中的复杂公式并将其转换为可编辑的数据格式。该软件提供了一个直观的界面&#xff0c;用户可以在编辑LaTeX代码的同时实时预览公式的效果&#xff0c;无需额外的编译步骤。此外&#xff0c;SimpleT…...

区块链的技术架构:节点、网络和数据结构

区块链技术听起来很高大上&#xff0c;但其实它的核心架构并不难理解。今天我们就用一些简单的例子和有趣的比喻&#xff0c;来聊聊区块链的技术架构&#xff1a;节点、网络和数据结构。 节点&#xff1a;区块链的“细胞” 想象一下&#xff0c;区块链就像是一个大型的组织&a…...

pdfmake不能设置表格边框颜色?

找到pdfmake>build>pdfmake.js中&#xff1a; 找到定义的“TableProcessor.prototype.drawVerticalLine”和“TableProcessor.prototype.drawHorizontalLine”两个方法&#xff1a; 重新定义borderColor: var borderColor this.tableNode.table.borderColor||"#…...

laravel 使用RabbitMQ作为消息中间件

先搞定环境&#xff0c;安装amqp扩展 确保已安装rabbitmq-c-dev。 比如 可以使用apk add rabbmit-c-dev安装 cd ~ wget http://pecl.php.net/get/amqp-1.10.2.tgz tar -zxf amqp-1.10.2.tgz cd amqp-1.10.2 phpize ./configure make && make install cd ~ rm -rf am…...

web项目打包成可以离线跑的exe软件

目录 引言打开PyCharm安装依赖创建 Web 应用运行应用程序打包成可执行文件结语注意事项 引言 在开发桌面应用程序时&#xff0c;我们经常需要将网页集成到应用程序中。Python 提供了多种方法来实现这一目标&#xff0c;其中 pywebview 是一个轻量级的库&#xff0c;它允许我们…...

BFS:队列+树的宽搜

一、二叉树的层序遍历 . - 力扣&#xff08;LeetCode&#xff09; 该题的层序遍历和以往不同的是需要一层一层去遍历&#xff0c;每一次while循环都要知道在队列中节点的个数&#xff0c;然后用一个for循环将该层节点走完了再走下一层 class Solution { public:vector<vec…...

MySQL高级-SQL优化- count 优化 - 尽量使用count(*)

文章目录 1、count 优化2、count的几种用法3、count(*)4、count(id)5、count(profession)6、count(null)7、 count(1) 1、count 优化 MyISAM引擎把一个表的总行数存在了磁盘上&#xff0c;因此执行count&#xff08;*&#xff09;的时候会直接返回这个数&#xff0c;效率很高&a…...

python Flask methods

在 Flask 中&#xff0c;app.route() 装饰器用于定义 URL 路由和与之关联的视图函数。当你想指定某个 URL 可以接受哪些 HTTP 方法时&#xff0c;你可以使用 methods 参数。methods 是一个列表&#xff0c;它可以包含任何有效的 HTTP 方法。 Falsk文章中的描述&#xff1a; 链…...

three.js场景三元素

three.js是一个基于WebGL的轻量级、易于使用的3D库。它极大地简化了WebGL的复杂细节&#xff0c;降低了学习成本&#xff0c;同时提高了性能。 three.js的三大核心元素&#xff1a; 场景&#xff08;Scene&#xff09; 场景是一个三维空间&#xff0c;是所有物品的容器。可以将…...

Spring AOP(面向切面编程)详解

Spring AOP&#xff08;面向切面编程&#xff09;详解 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 什么是Spring AOP&#xff1f; Spring AOP&#xff08…...

Kafka第一篇——内部组件概念架构启动服务器zookeeper选举以及底层原理

目录 引入 ——为什么分布式系统需要用第三方软件&#xff1f; JMS 对比 组件 架构推演——备份实现安全可靠 &#xff0c; Zookeeper controller的选举 controller和broker底层通信原理 BROKER内部组件 ​编辑 topic创建 引入 ——为什么分布式系统需要用第三方软件&#…...

14、顺时针打印矩阵

题目&#xff1a; 顺时针打印矩阵 描述&#xff1a; 输入一个矩阵&#xff0c;按照从外向里以顺时针的顺序依次打印出每一个数字&#xff0c; 例如&#xff0c; 如果输入如下矩阵&#xff1a; 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 则依次打印出数字&#xff1a;1,2,3,4,8,1…...

毅速丨金属3D打印是制造业转型升级的重要技术

随着科技的进步&#xff0c;金属3D打印技术已成为制造业升级的重要驱动力。它以其独特的优势&#xff0c;正引领着制造业迈向新的未来。 金属3D打印技术的突破&#xff1a; 设计自由。金属3D打印能制造任意形状和结构的零件&#xff0c;为设计师提供了无限的创意空间。 快速制…...

uni-app uni-data-picker级联选择器无法使用和清除选中的值

出现问题&#xff1a; 使用点击右边的叉号按钮无法清除已经选择的uni-data-picker值 解决办法&#xff1a; 在uni-app uni-data-picker使用中&#xff0c;要添加v-model&#xff0c;v-model在官网的示例中没有体现&#xff0c;但若不加则无法清除。 <uni-data-picker v-m…...

构造函数的小白理解

一、实例 using System; using System.Collections; using System.Collections.Generic; using UnityEngine;//定义一个名为Question的类&#xff0c;用于存储问题及相关信息 [Serializable] public class Question {public string questionText;//存储题目文本字段public str…...

招聘,短信与您:招聘人员完整指南

招聘人员面临的最大挑战之一就是沟通和联系候选人。为何?我们可以从以下原因开始&#xff1a;候选人通常被太多的招聘人员包围&#xff0c;试图联系他们&#xff0c;这使得你很难吸引他们的注意。在招聘过程的不同阶段&#xff0c;根据不同的工作量&#xff0c;让申请人保持最…...

JAVA-矩阵置零

给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 思路&#xff1a; 找到0的位置&#xff0c;把0出现的数组的其他值夜置为0 需要额外空间方法&#xff1a; 1、定义两个布尔数组标记二维数组中行和列…...

[信号与系统]模拟域中的一阶低通滤波器和二阶滤波器

前言 不是学电子出身的&#xff0c;这里很多东西是问了朋友… 模拟域中的一阶低通滤波器传递函数 模拟域中的一阶低通滤波器的传递函数可以表示为&#xff1a; H ( s ) 1 s ω c H(s) \frac{1}{s \omega_c} H(s)sωc​1​ 这是因为一阶低通滤波器的设计目标是允许低频信…...

Mac环境 aab包转apks,并安装apks

一、下载下载bundletool工具 Releases google/bundletool GitHub 二、将下载bundletool.jar包、aab、keystore文件全部放到同一个目录下 例如我全部放到download目录下 转换命令行&#xff1a; java -jar bundletool-all-1.16.0.jar build-apks --modeuniversal --bundle…...

银河麒麟V10 SP1.1操作系统 离线安装 nginx1.21.5、redis 服务

银河麒麟官网地址&#xff1a;国产操作系统、麒麟操作系统——麒麟软件官方网站 一、查看系统版本 命令&#xff1a;nkvers 我的是 release V10 (SP1)&#xff0c;根据这个版本去官网找对应的rpm包 银河麒麟操作系统的rpm包必须从官方找&#xff0c; 要是随便找个Centos的rp…...

ios swift5 视频播放 播放视频失败 无法播放HEVC (H.265) 格式的视频 H.264格式的可以播放

文章目录 1.问题2.原因&#xff1a;iOS swift AVPlayerViewController无法播放HEVC (H.265) 格式的视频3.解决方法用第三方框架MobileVLCKit来播放4.用MobileVLCKit写的播放器4.1 两个oc版本的4.2 两个swiftUI版本的5.苹果是支持HEVC (H.265) 格式的视频&#xff0c;是硬件那边…...

网工内推 | 网络工程师,IE认证优先,最高18k*14薪,周末双休

01 上海吾索信息科技有限公司 &#x1f537;招聘岗位&#xff1a;网络工程师 &#x1f537;岗位职责&#xff1a; 1&#xff09;具备网络系统运维服务经验以及数据库实施经验&#xff0c;具备网络系统认证相关资质或证书&#xff1b; 2&#xff09;掌握常用各设备的运维巡检…...

【Qt】QMessageBox 各种对话框的默认显示效果

1. 函数原型 void about(QWidget *parent, const QString &title, const QString &text)void aboutQt(QWidget *parent, const QString &title QString())QMessageBox::StandardButton critical(QWidget *parent, const QString &title, const QString &…...

echarts-wordcloud:打造个性化词云库

前言 在当今信息爆炸的时代&#xff0c;如何从海量的文本数据中提取有用的信息成为了一项重要的任务。词云作为一种直观、易于理解的数据可视化方式&#xff0c;被广泛应用于文本分析和可视化领域。本文将介绍一种基于 echarts-wordcloud 实现的词云库&#xff0c;通过其丰富的…...

【Linux】线程——线程的概念、线程的特点、线程的优点和缺点、线程和进程、线程函数的使用

文章目录 Linux线程1. 线程的概念1.1 什么是线程 2. 线程的特点2.1 线程的优点2.2 线程的缺点2.4 线程和进程 3. 线程函数的使用pthread_create() 创建线程pthread_self() 获取线程IDpthread_exit() 线程终止pthread_cancel() 线程取消pthread_join() 线程等待pthread_detach()…...

windows重装系统

一、下载Ventoy工具&#xff0c;制作启动盘 官网地址&#xff1a;https://www.ventoy.net/cn/download.html 电脑插入用来制作系统盘的U盘&#xff0c;建议大小在8G以上。 双击打开刚解压出来的Ventoy2Disk.exe文件。打开界面如图&#xff1a; 确认U盘&#xff0c;如图&am…...

学习React hook API

React hook API useEffect&#xff08;异步执行&#xff09;useLayoutEffect&#xff08;同步执行&#xff09; useEffect&#xff08;异步执行&#xff09; useEffect: 是在浏览器完成绘制后异步执行的; 所以如果你在 useEffect 中改变了 DOM&#xff0c;可能会造成用户看到的…...

Linux部署wordpress站点

先安装宝塔面板 yum install -y wget && wget -O install.sh https://download.bt.cn/install/install_6.0.sh && sh install.sh ed8484bec 因为wordpress需要php&#xff0c;mysql&#xff0c;apache &#xff0c;httpd环境 参考&#xff1a;Linux 安装宝塔…...

零基础实现大模型部署(window平台)_微软大模型小型化部署

这是一个超详细安装教程&#xff0c;介绍了在 Window 电脑上如何部署 Qwen1.5 大模型。本文还涉及到 Python 及其环境的配置。 适合对象&#xff1a;有点后端编程基础&#xff0c;没有 Python 基础。 需要环境&#xff1a;Window10/11&#xff0c;支持 Cuda 的 Nvidia 显卡。…...

那么大的一个车卖24.9万?一起来看24款大众途昂

记得大众途昂刚上市的时候,所有看到这台车的人都会由衷感叹——“这车可真大”,那时的途昂还算是大众旗下偏高端的SUV,售价还要30多万起步。而如今,途昂还是那么大,但地位已不比当年,而且起售价格已经来到了24.9万。随着近年来汽车价格战的愈演愈烈,相比前几年,我们手持…...

城市通勤神器!奔腾小马2.89万元火爆预售

上下班高峰期的通勤,总是让人头疼。坐公交人挤人,路上的耗时更是无法计算;乘地铁相对省时间,但车厢里依然像是“沙丁鱼罐头”。如果遇到刮风下雨等恶劣天气,就更加令人恼火。在这种情况下,很多人都希望能拥有一辆价格便宜、配置够用的代步小车,虽无奢华体验,但求遮风挡…...

sdbusplus:通过文件描述符传递数据

有的时候需要传递大量的数据,如果将数据通过dbus传递,会消耗大量的带宽。可以通过传递一个文件描述符替代传递数据: 以下的service通过文件描述符接收数据: //fd_service.cpp #include <sdbusplus/asio/connection.hpp> #include <sdbusplus/asio/object_server…...

LLAMA3==shenzhi-wang/Llama3-8B-Chinese-Chat。windows安装不使用ollama

创建环境&#xff1a; conda create -n llama3_env python3.10 conda activate llama3_env conda install pytorch torchvision torchaudio cudatoolkit11.7 -c pytorch 安装Hugging Face的Transformers库&#xff1a; pip install transformers sentencepiece 下载模型 ht…...

STM32定时器及输出PWM完成呼吸灯

文章目录 一、STM32定时器原理1、基本定时器2、通用定时器&#xff08;1&#xff09;时钟源&#xff08;2&#xff09;预分频器PSC&#xff08;3&#xff09;计数器CNT&#xff08;4&#xff09;自动装载寄存器ARR 3、高级定时器 二、PWM工作原理三、控制LED以2s的频率周期性地…...

嵌入式单片机笔试题

DC-DC 和 LDO两者有何区别&#xff1f; DC-DC转换器&#xff08;直流-直流转换器&#xff09;和LDO&#xff08;低压差线性稳压器&#xff09;都是用于电源管理的设备&#xff0c;但它们在原理和特性上有一些显著的区别&#xff1a; 原理&#xff1a; DC-DC转换器通过改变输…...