kafka-3-kafka应用的核心要点和内外网访问
kafka实战教程(python操作kafka),kafka配置文件详解
Kafka内外网访问的设置
1 kafka简介
根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:
(1)发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。
(2)以容错的方式记录消息流,kafka以文件的方式来存储消息流。
(3)可以在消息发布的时候进行处理。
使用场景:
(1)在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。
(2)构建实时的流数据处理程序来变换或处理数据流,数据处理功能。
1.1 kafka生产者
(1)首先,创建ProducerRecord必须包含Topic和Value,key和partition可选。
(2)然后,序列化key和value对象为ByteArray,并发送到网络。
(3)接下来,消息发送到partitioner。
如果创建ProducerRecord时指定了partition,此时partitioner啥也不用做,简单的返回指定的partition即可。
如果未指定partition,partitioner会基于ProducerRecord的key生成partition。
(4)producer选择好partition后,增加record到对应topic和partition的batch record。
(5)最后,专有线程负责发送batch record到合适的Kafka broker。
(6)当broker收到消息时,它会返回一个应答(response)。
如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);
相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。
实例化producer后,接着发送消息。
这里主要有3种发送消息的方法:
(1)立即发送:只管发送消息到server端,不care消息是否成功发送。大部分情况下,这种发送方式会成功,因为Kafka自身具有高可用性,producer会自动重试;但有时也会丢失消息;
(2)同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;
(3)异步发送:通过带有回调函数的send()方法发送消息,当producer收到Kafka broker的response会触发回调函数。
以上所有情况,一定要时刻考虑发送消息可能会失败,想清楚如何去处理异常。
通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能,一般会有下面几种方式:单个producer起多个线程发送消息;使用多个producer。
生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中。
也可以通过指定均衡策略来将消息发送到不同的分区中。
如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中。
1.2 kafka消费者
Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。
(1)最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。
(2)最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。
(3)正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。
详细阐述如何实现以上三种方式。
(1)At-most-once(最多一次)
设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较小的时间间隔.
client不要调用commitSync(),kafka在特定的时间间隔内自动提交。
(2)At-least-once(最少一次)
方法一
设置enable.auto.commit为false
client调用commitSync(),增加消息偏移;方法二
设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较大的时间间隔.
client调用commitSync(),增加消息偏移;
(3)Exactly-once(正好一次)
如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。
设置enable.auto.commit为false
保存ConsumerRecord中的offset到数据库
当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化。
1 consumer订阅的topic中的分区大小发生变化
2 topic被创建或者被删除
3 consuer所在group中有个成员挂了
4 新的consumer通过调用join加入了group
此时 consumer通过实现ConsumerRebalanceListener接口,捕捉这些事件,对偏移量进行处理。consumer通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移位置。
当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。
不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
在消费者消费消息时,kafka使用offset来记录当前消费的位置。
在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不相同,不互相干扰。
对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费。
因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
1.3 Broker
Kafka是一个高吞吐量分布式消息系统,采用Scala和Java语言编写,它提供了快速、可扩展的、分布式、分区的和可复制的日志订阅服务。它由Producer、Broker、Consumer三部分构成.
Producer向某个Topic发布消息,而Consumer订阅某个Topic的消息。 一旦有某个Topic新产生的消息,Broker会传递给订阅它的所有Consumer,每个Topic分为多个分区,这样的设计有利于管理数据和负载均衡。
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
Controller:中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。(里面涉及到partition leader 选举)。
ISR:同步副本组。
谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
2 Kafka安装与使用
kafka官方下载地址
(1)首先确保你的机器上安装了jdk,kafka需要java运行环境。
(2)以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用。
2.1 安装jdk
1、解压
sudo tar -xzvf jdk-8u144-linux-x64.tar.gz -C /usr/local/
2、配置环境变量
vi /home/zb/.bashrc
export JAVA_HOME=/usr/local/jdk1.8.0_144
export PATH=$PATH:$JAVA_HOME/bin
3、配置立即生效
source /home/zb/.bashrc
java -version
2.2 安装kafka
1、解压
sudo tar -xzvf kafka_2.13-3.4.0.tgz -C /usr/local/
cd /usr/local/
sudo chmod 777 kafka_2.13-3.4.0
2、配置环境变量
vi /home/zb/.bashrc
export KAFKA_HOME=/usr/local/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
3、配置立即生效
source /home/zb/.bashrc
进行最简单的尝试的话我们只需要解压到任意目录即可。
2.3 配置
在kafka解压目录下下有一个config的文件夹,里面放置的是我们的配置文件。
一、consumer.properites
消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。
二、producer.properties
生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。
三、server.properties
kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。
1、broker.id
申明当前kafka服务器在集群中的唯一ID,需配置为integer,
并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可2、listeners
申明此kafka服务器需要监听的端口号,
listeners=PLAINTEXT://myubuntu:9092。
并确保服务器的9092端口能够访问
配置好/etc/hosts的内容
10.0.2.11 myubuntu3、zookeeper.connect
申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,
由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可
zookeeper.connect=localhost:2181
四、zookeeper.properties
zookeeper配置文件。
2.4 启动
cd /usr/local/kafka_2.13-3.4.0/
(1)启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
后台方式
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /tmp/zookeeperoutput.log 2>&1 &
(2)启动kafka
bin/kafka-server-start.sh config/server.properties
后台方式
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /tmp/kafkaoutput.log 2>&1 &(3)查看进程
jps
3297 Jps
2137 QuorumPeerMain
2684 Kafka(4)停止kafka
cd /usr/local/kafka_2.13-3.4.0/
停止kafka
kafka-server-stop.sh
停止zookeeper
zookeeper-server-stop.sh
2.5 应用
cd /usr/local/kafka_2.13-3.4.0/
创建topic
bin/kafka-topics.sh --create --bootstrap-server myubuntu:9092 --replication-factor 1 --partitions 1 --topic test查看已经创建的topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092创建一个消息消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning创建一个消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
3 kafka的配置
在kafka/config/目录下面有3个配置文件:
producer.properties producer
consumer.properties consumer
server.properties broker
3.1 BROKER的全局配置
最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。
3.1.1 系统相关
------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs##提供给客户端响应的端口
port =6667##消息体的最大大小,单位是字节
message.max.bytes =1000000## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertised.host.name## 广告地址端口,必须不同于port中的设置
advertised.port## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024## socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes =100*1024*1024
3.1.2 LOG相关
------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024## 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
log.roll.hours =24*7## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.minutes=7days指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes 。-1没有大小限制
## log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1## 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes## 是否开启日志压缩
log.cleaner.enable=false## 日志压缩运行的线程数
log.cleaner.threads =1## 日志压缩时候处理的最大大小
log.cleaner.io.max.bytes.per.second=None## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024## 日志清理时候用到的IO块大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024## 日志清理中hash表的扩大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9## 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024## 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
## 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=None## 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000## 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发.
log.flush.interval.ms = None## 文件在索引中清除后保留的时间 一般不需要去修改
log.delete.delay.ms =60000## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
3.1.3 TOPIC相关
------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =1## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions =1实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
3.1.4 复制(Leader、replicas) 相关
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000## partition leader与replicas数据同步时,消息的队列尺寸
controller.message.queue.size=10## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中.
## 在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages =4000##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000## leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=64*1024## replicas每次获取数据的最大大小
replica.fetch.max.bytes =1024*1024## replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1## leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1## 每个replica检查是否将最高水位进行固化的频率
replica.high.watermark.checkpoint.interval.ms =5000## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable =false## 控制器关闭的尝试次数
controlled.shutdown.max.retries =3## 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms =5000## 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable =false## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage =10## 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds =300## 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes
3.1.5 ZooKeeper相关
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000## ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =6000## ZooKeeper集群中leader和follower之间的同步实际那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
3.2 CONSUMER配置
最为核心的配置是group.id、zookeeper.connect。
## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要group.id## 消费者的ID,若是没有设置的话,会自增consumer.id## 一个用于跟踪调查的ID ,最好同group.id相同client.id = group id value## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置zookeeper.connect=localhost:2182## zookeeper的心跳超时时间,超过这个时间就认为是dead消费者zookeeper.session.timeout.ms =6000## zookeeper的等待连接时间zookeeper.connection.timeout.ms =6000## zookeeper的follower同leader的同步时间zookeeper.sync.time.ms =2000## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常auto.offset.reset = largest## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.socket.timeout.ms=30*1000## socket的接受缓存空间大小socket.receive.buffer.bytes=64*1024##从每个分区获取的消息大小限制fetch.message.max.bytes =1024*1024## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offsetauto.commit.enable =true## 自动提交的时间间隔auto.commit.interval.ms =60*1000## 用来处理消费消息的块,每个块可以等同于fetch.message.max.bytes中数值queued.max.message.chunks =10## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数.rebalance.max.retries =4## 每次再平衡的时间间隔rebalance.backoff.ms =2000## 每次重新选举leader的时间refresh.leader.backoff.ms## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求fetch.min.bytes =1## 若是不满足最小大小(fetch.min.bytes)的话,等待消费端请求的最长等待时间fetch.wait.max.ms =100## 指定时间内没有消息到达就抛出异常,一般不需要改consumer.timeout.ms = -1
3.3 PRODUCER的配置
比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class。
## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vipmetadata.broker.list##消息的确认模式##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP##1:发送消息,并会等待leader 收到确认后,一定的可靠性## -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性request.required.acks =0## 消息发送的最长等待时间request.timeout.ms =10000## socket的缓存大小send.buffer.bytes=100*1024## key的序列化方式,若是没有设置,同serializer.classkey.serializer.class## 分区的策略,默认是取模partitioner.class=kafka.producer.DefaultPartitioner## 消息的压缩模式,默认是none,可以有gzip和snappycompression.codec = none## 可以针对默写特定的topic进行压缩compressed.topics=null## 消息发送失败后的重试次数message.send.max.retries =3## 每次失败后的间隔时间retry.backoff.ms =100## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据topic.metadata.refresh.interval.ms =600*1000## 用户随意指定,但是不能重复,主要用于跟踪记录消息client.id=""------------------------------------------- 消息模式 相关 -------------------------------------------## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送producer.type=sync## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送queue.buffering.max.ms =5000## 异步的模式下 最长等待的消息数queue.buffering.max.messages =10000## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃queue.enqueue.timeout.ms = -1## 异步模式下,每次发送的最大消息数,前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制batch.num.messages=200## 消息体的系列化处理类 ,转化为字节流进行传输serializer.class= kafka.serializer.DefaultEncoder
4 Kafka内外网访问的设置
4.1 listeners和advertised.listeners
kafka的两个配置listeners和advertised.listeners。
一、listeners
kafka监听的网卡的ip,假设你机器上有两张网卡,内网192.168.0.213和外网101.89.163.1 如下配置
listeners=PLAINTEXT://192.168.0.213:9092
那么kafka只监听内网网卡,即只接收内网网卡的数据,如果你不能把外网网卡流量转发到内网网卡,那么kafka就接收不到外网网卡数据。
如果配置成外网ip同理。
当然你可以配置成0.0.0.0,监听所有网卡。
二、advertised.listeners
我们观察kafka的配置文件server.properties,会发现里面记录了zookeeper集群的各个节点的访问地址,但是并没有记录kafka兄弟节点的地址。
kafka节点启动后,会向zookeeper注册自己,同时从zookeeper中获取兄弟节点的地址,以便与兄弟节点通信。
同样,我们使用客户端连接kafka后,kafka返回给客户端的是集群各节点的访问地址,这个地址也是上面说的从zookeeper中获得的地址。
这个地址哪里来,就是kafka节点向zookeeper注册时提供的advertised.listeners。如果没有,就会使用listeners。
4.2 只需要内网访问kafka
kafka只监听内网网卡,即只接收内网网卡的数据。
listeners=PLAINTEXT://192.168.0.213:9092
4.3 只需要外网访问kafka
kafka只监听外网网卡,即只接收外网网卡的数据。
listeners=PLAINTEXT://101.89.163.1:9092
4.4 需要内外网访问
使用宿主机通过NAT映射搞出来的外网ip,此时kafka无法监听这个外网ip(因为不存在,启动就会报错)。这时候就是advertised.listeners真正发挥作用的时候了。
使用如下配置:
listeners=PLAINTEXT://192.168.0.213:9092
advertised.listeners=PLAINTEXT://101.89.163.1:9092
此时一个完整的kafka客户端访问服务端的流程:
(1)客户端访问101.89.163.1:9092,被kafka宿主机所在环境映射到内网192.168.0.213:9092,访问到了kafka节点,请求获得kafka服务端的访问地址。
(2)kafka从zookeeper拿到自己和其他兄弟节点通过advertised.listeners注册到zookeeper的101.89.163.1:9092等外网地址,作为kafka的服务端访问地址返回给客户端。
(3)客户端拿这些地址访问kafka集群,被kafka宿主机所在环境映射到各kafka节点的内网ip,访问到了kafka服务端…完美循环。
你可能会问已经配置了访问地址,为什么还要在第一次访问的时候请求获得kafka的访问地址。因为如果是kafka集群,你可以选择只给客户端配置一个kafka节点的地址(这样是不推荐的),但是客户端必须要访问集群中的每一个节点,所以必须通过这个节点获得集群中每一个节点的访问地址。
如果不配置advertised.listeners=PLAINTEXT://101.89.163.1:9092,你会发现虽然你给kafka客户端配置的访问地址是101.89.163.1:9092,但是kafka客户端访问时报错,报错原因是Connection to node -1[192.168.0.213:9092] could not be established. Broker may not be available.。这就是因为不配置advertised.listeners则advertised.listeners默认使用listeners配置的地址,客户端拿到的就是listeners配置的内网地址。
相关文章:
kafka-3-kafka应用的核心要点和内外网访问
kafka实战教程(python操作kafka),kafka配置文件详解 Kafka内外网访问的设置 1 kafka简介 根据官网的介绍,ApacheKafka是一个分布式流媒体平台,它主要有3种功能: (1)发布和订阅消息流,这个功能类似于消息队列&#x…...
VS2017+OpenCV4.5.5 决策树-评估是否发放贷款
决策树是一种非参数的监督学习方法,主要用于分类和回归。 决策树结构 决策树在逻辑上以树的形式存在,包含根节点、内部结点和叶节点。 根节点:包含数据集中的所有数据的集合内部节点:每个内部节点为一个判断条件,并且…...
Prometheus 记录规则和警报规则
前提环境: Docker环境 涉及参考文档: Prometheus 录制规则Prometheus 警报规则 语法检查规则 promtool check rules /path/to/example.rules.yml一:录制规则语法 groups 语法: groups:[ - <rule_group> ]rule_group…...
(API)接口测试的关键技术
接口测试也就是API测试,从名字上可以知道是面向接口的测试活动。所以在讲API测试之前,我们应该说清楚接口是什么,那么接口就是有特定输入和特定输出的一套逻辑处理单元,而对于接口调用方来说,不用知道自身的内部实现逻…...
快速排序算法原理 Quicksort —— 图解(精讲) JAVA
快速排序是 Java 中 sort 函数主要的排序方法,所以今天要对快速排序法这种重要算法的详细原理进行分析。 思路:首先快速排序之所以高效一部分原因是利用了离散数学中的传递性。 例如 1 < 2 且 2 < 3 所以可以推出 1 < 3。在快速排序的过程中巧…...
linux环境搭建私有gitlab仓库
搭建之前,需要安装相应的依赖包,并且要启动sshd服务(1).安装policycoreutils-python openssh-server openssh-clients [rootVM-0-2-centos ~]# sudo yum install -y curl policycoreutils-python openssh-server openssh-clients [rootVM-0-2-centos ~]…...
SpringSecurity授权
文章目录工具类使用自定义失败处理代码配置跨域其他权限授权hasAnyAuthority自定义权限校验方法基于配置的权限控制工具类 import javax.servlet.http.HttpServletResponse; import java.io.IOException;public class WebUtils {/*** 将字符串渲染到客户端** param response 渲…...
学习 Python 之 Pygame 开发坦克大战(一)
学习 Python 之 Pygame 开发坦克大战(一)Pygame什么是Pygame?初识pygame1. 使用pygame创建窗口2. 设置窗口背景颜色3. 获取窗口中的事件4. 在窗口中展示图片(1). pygame中的直角坐标系(2). 展示图片(3). 给部分区域设置颜色5. 在窗口中显示文字6. 播放音…...
2.5|iot冯|方元-嵌入式linux系统开发入门|2.13+2.18
一、 Linux 指令操作题(共5题(共 20 分,每小题 4分)与系统工作、系统状态、工作目录、文件、目录、打包压缩与搜索等主题相关。1.文件1.1文件属性1.2文件类型属性字段的第1个字符表示文件类型,后9个字符中,…...
一起Talk Android吧(第四百九十六回:自定义View实例二:环形进度条)
文章目录 知识回顾实现思路实现方法示例代码各位看官们大家好,上一回中咱们说的例子是"如何使用Java版MQTT客户端",这一回中咱们说的例子是"自定义View实例二:环形进度条"。闲话休提,言归正转,让我们一起Talk Android吧! 知识回顾 看官们,我们又回…...
上传图片尺寸校验
使用方法 ● Image ● URL ● onload代码: async validImageSize(file, imgWidth, imgHeight) {const img new Image()img.src URL.createObjectURL(file)const { w, h } await new Promise((resolve, reject) > {img.onload () > {const { width: w, he…...
【Python】缺失值处理和拉格朗日插值法(含源代码实现)
目录:缺失值处理和拉格朗日插值法一、前言二、理论知识三、代码实现一、前言 对于含有缺失值的数据集,如果通过删除小部分记录达到既定的目标,那么删除含有缺失值的记录的方法是最有效的。然而,这种方法也有很多问题,…...
SpringCloudAlibaba-Sentinel
一、介绍官网:https://github.com/alibaba/Sentinel/下载jar包,启动,访问http://localhost:8080/创建module添加如下依赖<!--SpringCloud ailibaba sentinel --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring…...
【程序化天空盒】过程记录02:云扰动 边缘光 消散效果
写在前面 写在前面唉,最近筋疲力竭,课题组的东西一堆没做,才刚刚开始带着思考准备练习作品,从去年5月份开始到现在真得学了快一年了,转行学其他的真的好累,,不过还是加油! 下面是做…...
链表OJ(三) 反转链表合集
目录 反转链表 反转链表 II 链表中的节点每k个一组翻转 描述 给定一个单链表的头结点pHead(该头节点是有值的,比如在下图,它的val是1),长度为n,反转该链表后,返回新链表的表头。 数据范围: 0≤n≤10000≤…...
SQLSERVER2019安装步骤过程
第一步官网下载SQLSERVER软件包 目前官网只能下载最新版本2022版本。 通过迅雷下载网址 SQL Server 2019 Enterprise (x64) - DVD (Chinese-Simplified)企业版 ed2k://|file|cn_sql_server_2019_enterprise_x64_dvd_2bfe815a.iso|1632086016|58C258FF0F1D006DD3C1F5F17AF3E…...
Java模块化概述
3 模块化 3.1 模块化概述 Java语言随着这些年的发展已经成为了一]影响深远的编程语言,无数平台,系统都采用Java语言编写。但是,伴随着发展,Java也越来越庞大,逐渐发展成为-门“臃肿” 的语言。而且,无论是运行个大型的…...
Connext DDSPersistence Service持久性服务(2)
可选数据库组件及兼容性当Persistence Service配置为PERSISTENT模式时,您可以选择将主题数据存储在文件中还是存储在外部关系数据库中。 唯一支持的外部数据库是MySQL。 当PersistenceService在PERSISTENT模式下使用时,您可以将其配置为将DDS样本存储到关系数据库中,例如MyS…...
MongoDB
MongoDB 应用场景 在传统数据库(Mysql),在数据操作的 **High performance 对数据库高并发读写的需求、Hugu Storage 对海量数据的高效率存储和访问的需求、High Scalability && High Availability 对数据库高扩展和高可用性的需…...
python 迭代器生成器
目录 一、可迭代对象 1.1 判断是否为可迭代对象 二、迭代器 2.1 判断对象是否是一个迭代器 2.2 手写一个迭代器 2.3 迭代器应用场景 三、生成器 3.1 生成器介绍 3.2 使用yield 关键字 生成器,来实现迭代器 3.3 生成器(yield关键字)…...
Iceberg基于Spark MergeInto语法实现数据的增量写入
SPARK SQL 基本语法 示例SQL如下 MERGE INTO target_table t USING source_table s ON s.id t.id //这里是JOIN的关联条件 WHEN MATCHED AND s.opType delete THEN DELETE // WHEN条件是对当前行进行打标的匹配条件 WHEN MATCHED AND s.opType update THEN…...
JavaScript Array(数组) 对象
JavaScript 中的 Array(数组)对象是一种用来存储一系列值的容器,它可以包含任意类型的数据,包括数字、字符串、对象等等。通过使用数组对象,我们可以轻松地组织和处理数据,以及进行各种操作,比如…...
Debian如何更换apt源
中科大 deb https://mirrors.ustc.edu.cn/debian/ stretch main non-free contrib deb https://mirrors.ustc.edu.cn/debian/ stretch-updates main non-free contrib deb https://mirrors.ustc.edu.cn/debian/ stretch-backports main non-free contrib deb-src https://mirr…...
Connext DDSPersistence Service持久性服务
DDS持久性服务,它保存了DDS数据样本,以便即使发布应用程序已经终止,也可以稍后将其发送到加入系统的订阅应用程序。 简介Persistence Service是一个Connext DDS应用程序,它将DDS数据样本保存到临时或永久存储中,因此即使发布应用程序已经终止,也可以稍后将其交付给加入系…...
自抗扰控制ADRC之微分器TD
目录 前言 1 全程快速微分器 1.1仿真分析 1.2仿真模型 1.3仿真结果 1.4结论 2 Levant微分器 2.1仿真分析 2.2仿真模型 2.3仿真结果 3.总结 前言 工程上信号的微分是难以得到的,所以本文采用微分器实现带有噪声的信号及其微分信号提取,从而实现…...
链表学习之复制含随机指针的链表
链表解题技巧 额外的数据结构(哈希表);快慢指针;虚拟头节点; 复制含随机指针的链表 该链表节点的结构如下: class ListRandomNode { public:ListRandomNode() : val(0), next(nullptr), random(nullptr…...
【人脸检测】Yolov5Face:优秀的one-stage人脸检测算法
论文题目:《YOLO5Face: Why Reinventing a Face Detector》 论文地址:https://arxiv.org/pdf/2105.12931.pdf 代码地址:https://github.com/deepcam-cn/yolov5-face 1.简介 近年来,CNN在人脸检测方面已经得到广泛的应用。但是许多…...
【Unity3d】Unity与Android之间通信
在unity开发或者sdk开发经常遇到unity与移动端原生层之间进行通信,这里把它们之间通信做一个整理。 关于Unity与iOS之间通信,参考【Unity3d】Unity与iOS之间通信 Unity(c#)调用Android (一)、编写Java代码 实际上,任何已经存在的Java代码…...
Allegro如何更改DRC尺寸大小操作指导
Allegro如何更改DRC尺寸大小操作指导 在做PCB设计的时候,DRC可以辅助设计,有的时候DRC的尺寸过大会影响视觉,Allegro支持将DRC的尺寸变小或者改大 如下图,DRC尺寸过大 如何改小,具体操作如下 点击Setup选择Design Parameters...
Mongodb WT_PANIC: WiredTiger library panic
文章目录故障现象排查过程1.查看Log2.同步恢复数据故障现象 周五突然收到Mongo实例莫名奇妙挂了告警,一般都是RS复制集架构模式(5节点),查看此实例角色为SECONDAR,挂了暂时不影响线上业务,但还是需要尽快修…...
关于动物自己做的网站/seo优化方案模板
1、字符型输入框: (1)字符型输入框:英文全角、英文半角、数字、空或者空格、特殊字符,特别要注意单引号和&符号。不要直接输入特殊字符时,使用“粘贴、拷贝”功能尝试输入。 (2࿰…...
建建建设网站公司网站/西地那非片多少钱一盒
面试题 如何找回 root 密码,如果我们不小心,忘记 root 密码,怎么找回。 思路: 进入到 单用户模式,然后修改 root 密码。因为进入单用户模式,root 不需要密码就可 以登录。 实现步骤 1、开机->在引导时输入 回车键…...
网页设计页面尺寸/seo岗位有哪些
Python实战社群Java实战社群长按识别下方二维码,按需求添加扫码关注添加客服进Python社群▲扫码关注添加客服进Java社群▲作者丨酷酷的哀殿来源丨酷酷的哀殿的博客https://ai-chan.top/2020/09/27/iOS-Snapshots/你是否了解过iOS 是如何获取夜间模式启动图缓存路径&…...
毕设做购物网站容易吗/seo一键优化
泛型:为了让集合能够记住集合内元素各个类型,且能够达到只要编译时不出现问题,运行时就不会出现类型异常的解决方案。 泛型又从称为参数化类型,是一种编译时类型安全检测机制,类型参数的魅力在于使得程序具有可读性和…...
民政府公众信息网站建设/推广一次多少钱
所有题目均有四种语言实现。C++ 实现目录、Python实现目录、Java实现目录、JavaScript实现目录 题目 题目描述:去除文本多余空格,但不去除配对单引号之间的多余空格。给出关键词的起始和结束下标,去除多余空格后刷新关键词的起始和结束下标。输入描述: 输入为两行字符串: 第…...
为什么要建设学校网站/建材企业网站推广方案
模拟实现C智能指针shared_ptr和weak_ptr 仿写C的shared_ptr和weak_ptr 当强智能指针shared_ptr的引用计数为0时,析构资源 当弱智能指针weak_ptr的引用计数为0时,析构引用计数对象 #include<iostream> #include<new> #include<stdio.h&g…...