top域名的网站/项目推广方案怎么写
一、KafKa概述
1.1 定义
KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域
是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 |
Topic | Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
2 kafka&zookeper安装/配置/启动
2.1 kafka下载和安装
2.1.1 安装包下载
本次安装的kafka版本是3.5.0, 自带zookeeper
https://archive.apache.org/dist/kafka/3.5.0/kafka_2.12-3.5.0.tgz# 1 下载安装包到/opt目录下
# 2 进行解压 tar -xzvf kafka_2.12-3.5.0.tgz -C /opt
2.1.2 准备集群节点
192.168.190.147 node1
192.168.190.145 node2
192.168.190.142 node3
2.2 kafka&zookeeper配置
先以node1基点为例进行配置
2.2.1 server.properties配置
先以node1基点为例进行配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 # 每个kafka服务器是一个broker,如果kafka存在集群则注意填写该值############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://node1:9092 # 这里用来配置本节点的kafka服务的监听端口# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/opt/kafka/kafka-logs # kafka的log文件实际存储的是kafka的topic中的消息文件# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=node1:2181,node2:2181,node3:2181 # 这里用于配置zookeeper的地址和端口, 如果存在zk集群,则填写集群的多个ip:port# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
2.2.2 zookeeper.properties配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/data # 定义zk的数据存储目录,需要手动 mkdir
dataLogDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/log # 定义zk的日志存储目录,需要手动 mkdir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080#设置连接参数,添加如下配置
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5# #设置broker Id的服务地址
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
2.2.3 创建myid文件
在zookeeper.properties配置的数据目录下(/opt/kafka/kafka_2.12-3.5.0/zookeeper/data),创建一个文本文件myid, 内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是1、2、3.
经过以上三个步骤就配置好了node1节点,按照同样的方法配置node2和node3节点。
2.3 kafka&zookeeper的启动
# 注意要先启动zookeeper再启动kafka# 启动zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties# 启动kafka
./kafka-server-start.sh -daemon ../config/server.properties# 停止zookeeper
./zookeeper-server-stop.sh# 停止kafka
./kafka-server-stop.sh
3 kafka相关操作命令
3.1 创建kafka的topic
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 1 --topic test
# 1 说明:
# --bootstrap-server 192.168.190.136:9092 表示kafka地址和端口
#--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可/ 2.20废弃!!!!!!!!!!!!!
#--replication-factor:定义分区副本数,1 代表单副本,建议为 2
#--partitions:定义分区数
#--topic:定义 topic 名称
3.2 查看topic列表
./kafka-topics.sh --list --bootstrap-server node1:9092
# 2 说明:--bootstrap-server node1:9092 表示kafka地址和端口
3.3 查看topic信息
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test
3.4 创建kafka生产者
kafka自带了一个producer的命令客户端,可以从本地文件中读取内容,或者我们也可以一命令行中直接输入内容,并将这些内容以消息的形式送到kafka集群中。在默认情况下,没一行会被当成一个独立的消息。使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic
# 创建一个kafka控制台的生产者,创建后可在控制台直接输入信息发送到对应的topic下
./kafka-console-producer.sh --broker-list node1:9092 --topic test
3.5 创建kafka消费者
# 创建一个kafka控制台的消费者,创建后可直接取对应的topic的数据并输出
./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning#--from-beginning:会把主题中以往所有的数据都读取出来
3.6 查看消费者组列表
./kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
3.7 查看kafka中某一个消费者组的消费情况
./kafka-consumer-groups.sh --bootstrap-server node1 --group console-consumer-6920 --describe# console-consumer-6920: 表示消费者组
# 返回如下
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-6920 test 0 - 12 - console-consumer-e162ef6d-600f-45d1-8198-af7e230642a7 /192.168.190.136 console-consumer
4 zookeeper相关命令
4.1 连接zookeeper
# ./zookeeper-shell.sh zookeeper_server:port --> 连接zookeeper
./zookeeper-shell.sh node1:2181# 不进入zookeeper执行相关指令
./zookeeper-shell.sh node1:2181 ls / # 查看当前 ZooKeeper 中所包含的内容
./zookeeper-shell.sh node1:2181 ls -s / # 查看当前节点数据
./zookeeper-shell.sh node1:2181 ls /brokers/ids #
./zookeeper-shell.sh node1:2181 ls /brokers/topics
./zookeeper-shell.sh node1:2181 get /brokers/ids/0
4.2 查看当前Zookeeper中所含的内容
# 查看当前Zookeeper中所含的内容
ls /
4.3 查看当前节点数据
ls -s /
# 返回如下
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
cZxid = 0x0 # 数据节点创建时的事务 ID
ctime = Wed Dec 31 16:00:00 PST 1969 # 数据节点创建时的时间
mZxid = 0x0 # 数据节点最后一次更新时的事务 ID
mtime = Wed Dec 31 16:00:00 PST 1969 # 数据节点最后一次更新时的时间
pZxid = 0x200000057 # 数据节点的子节点最后一次被修改时的事务 ID
cversion = 18 # 子节点的更改次数
dataVersion = 0 # 节点数据的更改次数
aclVersion = 0 # 节点的 ACL 的更改次数
ephemeralOwner = 0x0 # 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength = 0 # 数据内容的长度
numChildren = 12 # 数据节点当前的子节点个数
4.4 创建节点
#创建序列化永久节点:
create -s /testnode test
#创建临时节点
create -e /testnode-temp testtemp
#创建永久节点:
create /testnode-p testp
4.5 获取节点
ls path [watch]
get path [watch]
ls -s path [watch]
4.6 修改节点
ls -s /
get -s /testnode-temp
set /testnode-temp 123
get -s /testnode-temp
4.7 监听节点
get /testnode-temp watch
set /testnode-temp testwatch
#他会回调Watch得到触发结果
4.8 删除节点
#普通删除的命令
delete path [version]
#递归删除的命令
rmr path [version]deleteall path
二、 知识点
1 主题Topic
在逻辑的对消息的种类进行划分
2 分区partition
当Topic中的消息非常多的时候,会导致kafka的log日志文件特别大(kafka的日志文件存储的是消息)。
2.1 分区的作用
- 可以分布式存储
- 可以并行写
2.2 实例说明1
- 为一个主题创建多个分区
# 创建一个名为test的Topic, 并为其创建2个partition
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic mytest
- 查看topic的分区信息
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest
Topic: mytest TopicId: ulEszEwcT0i3Iu5TyfIglg PartitionCount: 2 ReplicationFactor: 1 Configs: Topic: mytest Partition: 0 Leader: 2 Replicas: 2 Isr: 2Topic: mytest Partition: 1 Leader: 1 Replicas: 1 Isr: 1
- 查看kafka日志文件
当设置mytest的partition==2的时候,会在node1上创建一个mytest-1, 在node2上创建一个mytest-0. 因为kafka的是集群搭建,所以在给mytest创建分区的时候会随机分配到集群中的两个节点上。
可以看到 kafka默认的toipic:__consumer_offsets,具有50个partition,分散在node1和node2的节点上。。# node1节点
drwxr-xr-x 29 root root 4096 Aug 10 02:21 ./
drwxr-xr-x 4 root root 4096 Aug 7 21:18 ../
-rw-r--r-- 1 root root 0 Aug 7 21:18 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-1/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-11/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-13/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-15/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-17/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-19/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-21/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-23/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-25/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-27/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-29/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-3/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-31/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-33/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-35/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-37/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-39/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-41/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-43/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-45/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-47/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-49/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-5/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-7/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-9/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 faust-group-01-__assignor-__leader-0/
-rw-r--r-- 1 root root 0 Aug 7 21:18 .lock
-rw-r--r-- 1 root root 4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Aug 10 02:16 meta.properties
drwxr-xr-x 2 root root 4096 Aug 10 02:18 mytest-1/
-rw-r--r-- 1 root root 657 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 657 Aug 10 02:21 replication-offset-checkpoint# node2节点
root@ubuntu:/opt/kafka/kafka_2.12-3.5.0/bin# ll /opt/kafka/kafka-logs/
total 144
drwxr-xr-x 32 root root 4096 Aug 10 02:21 ./
drwxr-xr-x 4 root root 4096 Aug 7 21:18 ../
-rw-r--r-- 1 root root 0 Aug 7 21:18 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-0/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-10/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-12/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-14/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-16/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-18/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-2/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-20/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-22/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-24/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-26/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-28/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-30/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-32/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-34/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-36/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-38/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-4/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-40/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-42/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-44/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-46/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-48/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-6/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 __consumer_offsets-8/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 example-0/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 faust-group-02-__assignor-__leader-0/
-rw-r--r-- 1 root root 0 Aug 7 21:18 .lock
-rw-r--r-- 1 root root 4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Aug 10 02:18 meta.properties
drwxr-xr-x 2 root root 4096 Aug 10 02:18 myid-__assignor-__leader-0/
drwxr-xr-x 2 root root 4096 Aug 10 02:18 mytest-0/
-rw-r--r-- 1 root root 703 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 703 Aug 10 02:21 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Aug 10 02:21 wzp-0/
2.2 实例说明2
kafka每个消费者会维护自己消费的主题的偏移量offset, 并把这个offset提交给kafka内部的topic: __consumer_offsets, 提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic中的消息,最后就保留最新的那条数据。
因为_consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式增大并发。
通过如下公式可以选出consumer消费的offset要提交到_consumer_offsets的哪个分区。
公式: hash(consumerGroupId) % __consumer_offsets主题的分区数
3 副本replication
副本是对分区的备份。在集群中,不同的副本会被部署到不同的broker上。同一个分区可以设定多个相同的副本,其中一个叫做 leader,其他叫做 follower,生产者和消费者只和 leader 交互,生产者生产的消息发送到 leader 之后,其他 follower 才能同从 leader 中同步消息。当 leader 发生故障的时候会从 follower 中选择一个称为新的leader。
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
# 创建 1个topic 2个分区 3个副本
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 2 --topic mytest01# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest01
Topic: mytest01 TopicId: MSmYL0CXStu0g_29irzH3A PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: mytest01 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: mytest01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2# 说明
mytest01的0分区的leader在node2节点上
mytest01的1分区的leader在node1节点上# leader:副本里的概念
每个partition都有一个broker作为leader
生产者和消费者的读写请求都是发生在leader所在的分区,而不是副本上# fllower: leader处理所有针对这个partition分区的读写请求。follower被动复制leader,不提供读写
10 如何保证消息的消费顺序
Kafka 通过以下机制来保证消费的顺序性:
- 分区: Kafka 中的每个主题可以分为多个分区,每个分区都只由一个消费者进行消费。分区是 Kafka 中实现水平扩展和并行处理的基本单位。对于同一分区内的消息,它们的顺序是有序的,所以确保每个分区只被一个消费者处理可以保证该分区内消息的顺序性。
- 偏移量(Offset): 每个分区中的消息都有一个唯一标识符,称为偏移量(Offset)。偏移量表示消息在分区内的顺序。消费者可以通过指定偏移量来读取分区中的特定消息。Kafka 使用偏移量来记录消费者的消费进度,并在消费者恢复或重启时从上一次的偏移量处继续消费,保证了消息的有序性。
- 消费者组: 多个消费者可以组成一个消费者组,并同时消费同一个主题的不同分区。当启动多个消费者时,Kafka 会自动分配分区给不同的消费者,保证每个分区只由一个消费者处理。这样,每个消费者负责消费自己所分配的分区,而不会出现多个消费者同时消费同一个分区的情况,从而保证了消费的顺序性。
需要注意的是,Kafka 只能在同一个分区内保证消息的顺序性,而不同分区之间的顺序是无法保证的。如果需要对多个分区的消息进行有序处理,可以通过将多个分区的数据合并到一个分区中,或使用其他流处理工具(如 Apache Flink 或 Apache Spark Streaming)来实现。
总结起来,Kafka 通过分区、偏移量和消费者组等机制来保证消费的顺序性。每个分区内的消息是有序的,而消费者组确保每个分区只由一个消费者处理,从而保证了消费整体上的顺序性。
11 zookeeper在kafka中的作用
用于进行broker和topic的注册
相关文章:

KafKa集群搭建和知识点
一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域 是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统&a…...

剑指 Offer 56 - I. 数组中数字出现的次数题解
题目描述:剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣(LeetCode) 一个整型数组 nums 里除两个数字之外,其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n),空间复杂度是O(1)。 示…...

CSDN付费专栏写作协议
一、总则 1.1、欢迎您选用CSDN付费专栏服务(“本服务”)。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议(“本协议)。本协议被视为《CSDN用户服务条款》(链接:https://passport.csdn.net/ser…...

[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现
描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述: 多组数据,每行为一个长度不超过30位的十进制非负整数。 (注意是10进制数字的个数可能有30个,而非30bits的整数) 输出描述ÿ…...

vue3多条件搜索功能
搜索功能在后台管理页面中非常常见,本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...

C++20协程
目录 协程原理: 进程、线程和协程的区别和联系编辑 协程在IO多路复用中 协程的目的: 协程的优势: 协程原理: (学习来源:幼麟实验室) 线程是进程中的执行体,拥有一个…...

Zabbix 6.0 监控其他
文章目录 一、Zabbix 监控 Windows 系统1)下载 Windows 客户端 Zabbix agent 22)安装客户端,配置3)在服务端 Web 页面添加主机,关联模板 二、Zabbix 监控 java 应用1)客户端开启 java jmxremote 远程监控功…...

Django rest_framework Serializer中的create、Views中的create/perform_create的区别
Django rest_framework Serializer中的create、Views中的create/perform_create的区别 对于后端来说,前后端分离的方式能让前后端的开发都爽。和所有的爽一样,每爽一次都要付出一定的代价。而前后端分离的代价,就是后端要面对巨量的模块化的功…...

差异性分析傻瓜版
path1输入你的第一个Excel path2输入你的第二个Excel DEG.dig <- function(path1,path2) { require(xlsx) require(tidyverse) require(limma) require(edgeR) E<- read.xlsx (path1,sheetIndex 1,header 1) %>% column_to_rownames(var &…...

Keystone Automotive EDI 需求分析
Keystone Automotive 是一家知名的汽车零部件销售卖场,自创立以来,在汽车行业取得了卓越的成就。作为一家专业的汽车零部件供应商,Keystone Automotive 致力于为客户提供优质的产品和卓越的服务。公司的经营范围涵盖广泛,涉及多个…...

jmeter创建一个压测项目
1.jemeter新建一个项目: 2.接下来对Thread进行描述,也可以先使用默认的Thread进行操作。 3.添加http请求头的信息。按照如图所示操作 4.在请求头里面添加必要的字段,可以只填必要字段就可以 5.添加Http请求信息,如下图ÿ…...

CEC2013(MATLAB):淘金优化算法GRO求解CEC2013的28个函数
一、淘金优化算法GRO 淘金优化算法(Gold rush optimizer,GRO)由Kamran Zolf于2023年提出,其灵感来自淘金热,模拟淘金者进行黄金勘探行为。淘金优化算法(Gold rush optimizer,GRO)提…...

AI Deep Reinforcement Learning Autonomous Driving(深度强化学习自动驾驶)
AI Deep Reinforcement Learning Autonomous Driving(深度强化学习自动驾驶) 背景介绍研究背景研究目的及意义项目设计内容算法介绍马尔可夫链及马尔可夫决策过程强化学习神经网络 仿真平台OpenAI gymTorcs配置GTA5 参数选择行动空间奖励函数 环境及软件…...

Java super
在Java中,关键字"super"用于引用一个类的父类。它可以有以下几种用法: 1. 访问父类成员:通过使用"super"后跟一个点,你可以从子类中访问父类的成员(方法或字段)。当子类重写一个方法或…...

【人工智能前沿弄潮】——生成式AI系列:Diffusers学习(1)了解Pipeline 、模型和scheduler
Diffusers旨在成为一个用户友好且灵活的工具箱,用于构建针对您的用例量身定制的扩散系统。工具箱的核心是模型和scheduler。虽然DiffusionPipeline为了方便起见将这些组件捆绑在一起,但您也可以拆分管道并单独使用模型和scheduler来创建新的扩散系统。 …...

TypeScript 非空断言
TypeScript 非空断言 发布于 2020-04-08 15:20:15 17.5K0 举报 一、非空断言有啥用 介绍非空断言前,先来看个示例: function sayHello(name: string | undefined) {let sname: string name; // Error } 对于以上代码,TypeScript 编译器…...

Python编程——谈谈函数的定义、调用与传入参数
作者:Insist-- 个人主页:insist--个人主页 本文专栏:Python专栏 专栏介绍:本专栏为免费专栏,并且会持续更新python基础知识,欢迎各位订阅关注。 目录 一、理解函数 二、函数的定义 1、语法 2、定义一个…...

在Ubuntu中使用Docker启动MySQL8的天坑
写在前面 简介: lower_case_table_names 是mysql设置大小写是否敏感的一个参数。 1.参数说明: lower_case_table_names0 表名存储为给定的大小和比较是区分大小写的 lower_case_table_names 1 表名存储在磁盘是小写的,但是比较的时候是不区…...

Python3.x String内置函数大全
文章目录 总结一下Python3.x字符串的常用系统函数,总共分为8类1. 大小写字母转换类的函数str.capitalize()str.title()str.lower()str.upper()str.swapcase() 2. 统计类的函数str.count(str1, beg 0,endlen(string)) 3. 匹配类的函数str.endswith(suffix, beg0, end…...

Go异常处理机制panic和recover
recover 使用panic抛出异常后, 将立即停止当前函数的执行并运行所有被defer的函数,然后将panic抛向上一层,直至程序crash。但是也可以使用被defer的recover函数来捕获异常阻止程序的崩溃,recover只有被defer后才是有意义的。 func main() { p…...

QMainwindow窗口
QMainwindow窗口 菜单栏在二级菜单中输入中文的方法给菜单栏添加相应的动作使用QMenu类的API方法添加菜单项分隔符也是QAction类 工具栏状态栏停靠窗口 菜单栏 只能有一个, 位于窗口的最上方 关于顶级菜单可以直接在UI窗口中双击, 直接输入文本信息即可, 对应子菜单项也可以通…...

P5735 【深基7.例1】距离函数
题目描述 给出平面坐标上不在一条直线上三个点坐标 ( x 1 , y 1 ) , ( x 2 , y 2 ) , ( x 3 , y 3 ) (x_1,y_1),(x_2,y_2),(x_3,y_3) (x1,y1),(x2,y2),(x3,y3),坐标值是实数,且绝对值不超过 100.00,求围成的三角形周长。保留两…...

prometheus告警发送组件部署
一、前言 要实现Prometheus的告警发送需要通过alertmanager组件,当prometheus触发告警策略时,会将告警信息发送给alertmanager,然后alertmanager根据配置的策略发送到邮件或者钉钉中,发送到钉钉需要安装额外的prometheus-webhook…...

CAPL - XML和TestModule结合实现测试项可选
目录 目的:是否想实现如下面的功能呢? 一、.can和.cin文件中函数开发...
Latex安装与环境配置(TeXlive、TeXstudio与VS code的安装)编译器+编辑器与学习应用
TeXlive 配置Tex排版系统需要安装编译器+编辑器。TeX 的源代码是后缀为 .tex 的纯文本文件。使用任意纯文本编辑器,都可以修改 .tex 文件:包括 Windows 自带的记事本程序,也包括专为 TeX 设计的编辑器(TeXworks, TeXmaker, TeXstudio, WinEdt 等),还包括一些通用的文本编…...

STM32 F103C8T6学习笔记3:串口配置—串口收发—自定义Printf函数
今日学习使用STM32 C8T6的串口,我们在经过学习笔记2的总结归纳可知,STM32 C8T6最小系统板上有三路串口,如下图: 今日我们就着手学习如何配置开通这些串口进行收发,这里不讲串口通信概念与基础,可以自行网上…...

python中字符串内建函数篇4
一、ljust() 语法:str.ljust(width,[fillchar]) 参数说明: width – 指定字符串长度。 fillchar – 填充字符,默认为空格。 返回值:返回一个原字符串左对齐,并使用空格填充至长度 width 的新字符串。如果指定的长度小于原字符串…...

并发下如何使用redis存储列表数据
1、问题 今天在工作中遇到一个问题,需要查询表A,需要根据每天所处小时所在时段,返回不同的记录给前端展示,如0-2时是在昨日0到2时生成的记录,而2-4时则是在昨日2-4时生成的记录,每条记录有一个唯一的id。表…...

Leecode螺旋矩阵 II59
59.螺旋矩阵II 题目建议: 本题关键还是在转圈的逻辑,在二分搜索中提到的区间定义,在这里又用上了。 题目链接:力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 文章讲解:代码随想录 视频…...

echarts 横向柱状图
<template><div ref"chart" style"height: 100%"></div> </template><script> import * as echarts from "echarts"; var cate ["质量通病1", "质量通病2", "质量通病3", "质…...