当前位置: 首页 > news >正文

kafka详解一

kafka详解一

1、消息引擎背景

根据维基百科的定义,消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递.
即:系统 A 发送消息给消息引擎系统,系统 B 从消息引擎系统中读取 A 发送的消息。
消息引擎的分类:点对点模型:也叫消息队列模型。如果拿上面那个“民间版”的定义来说,那么系统 A 发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息。日常生活的例子比如电话客服就属于这种模型:同一个客户呼入电话只能被一位客服人员处理,第二个客服人员不能为该客户服务。发布 / 订阅模型:与上面不同的是,它有一个主题(Topic)的概念,你可以理解成逻辑语义相近的消息容器。该模型也有发送方和接收方,只不过提法不同。发送方也称为发布者(Publisher),接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布 / 订阅模型。
消息引擎和JMS的关系:
JMS 是 Java Message Service,它也是支持上面这两种消息引擎模型的。严格来说它并非传输协议而仅仅是一组 API 罢了。不过可能是 JMS 太有名气以至于很多主流消息引擎系统都支持 JMS 规范,比如 ActiveMQ、RabbitMQ、IBM 的 WebSphere MQ 和 Apache Kafka。当然 Kafka 并未完全遵照 JMS 规范,相反,它另辟蹊径,探索出了一条特有的道路。

2、 Kafka概述

2.1、kafka的定义:

kafka是一个分布式的、基于发布订阅模式的消息队列,主要应用于大数据实时处理领域。

PUBLISH & SUBSCRIBE
Read and write streams of data like a messaging system.PROCESS
Write scalable stream processing applications that react to events in real-time.STORE
Store streams of data safely in a distributed, replicated, fault-tolerant cluster.

2.2 为什么有消息系统

2.1.1 异步处理

异步处理:
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行的方式和并行方式。串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户。

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列:
用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因为写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了

2.1.2 解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

2.1.3 流量削峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛!应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。可以控制活动的人数,可以缓解短时间内高流量压垮应用。用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

2.1.4 消息队列其它用处

解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

2.3、 Kafka核心概念

	Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。

2.4、 kafka特性

  • 高吞吐、低延迟

    kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
    
  • 高伸缩性

     每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
    
  • 持久性、可靠性

    Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失。
    
  • 容错性

     允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。
    
  • 高并发

    支持数千个客户端同时读写。
    

2.5、kafka核心模块解析

1、生产者API

允许应用程序发布记录流至一个或者多个kafka的主题(topics)。

2、消费者API

允许应用程序订阅一个或者多个主题,并处理这些主题接收到的记录流。

3、StreamsAPI

允许应用程序充当流处理器(stream processor),从一个或者多个主题获取输入流,并生产一个输出流到一个或 者多个主题,能够有效的变化输入流为输出流。

4、ConnectAPI

允许构建和运行可重用的生产者或者消费者

2.6、 Kafka集群架构

  • producer

     消息生产者,发布消息到Kafka集群的终端或服务
    
  • broker

    Kafka集群中包含的服务器,一个borker就表示kafka集群中的一个节点
    
  • topic

    每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。
    更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。
    
  • partition

每个 topic 包含一个或多个partition。Kafka分配的单位是partition
  • replication

    partition的副本,保障 partition 的高可用。
    
  • consumer

    从Kafka集群中消费消息的终端或服务
    
  • consumer group

    每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
    
  • leader

    每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互
    
  • follower

    Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
    
  • controller

    	知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。
  • zookeeper

    (1)	Kafka 通过 zookeeper 来存储集群的meta元数据信息
    (2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。
    
  • offset

    • 偏移量
    消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
    kafka0.8 版本之前offset保存在zookeeper上。
    kafka0.8 版本之后offset保存在kafka集群上。它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,名称叫 __consumer_offsets,它默认有50个分区。
    
  • ISR机制

    	光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
    

3. kafka集群安装部署

  • 1、下载安装包(http://kafka.apache.org)

    kafka_2.11-1.1.0.tgz
    
  • 2、规划安装目录

    /opt/install
    
  • 3、上传安装包到node01服务器,并解压

    通过FTP工具上传安装包到node01服务器的/opt/soft路径下,然后进行解压
    cd /opt/soft/
    tar -zxf kafka_2.11-1.1.0.tgz -C /opt/install/
  • 4、修改配置文件

    • 在node01上修改

      • 进入到kafka安装目录下有一个config目录,进行修改配置文件

        • node01执行以下命令进行修改配置文件
        cd  /opt/install/kafka_2.11-1.1.0/config
        vim server.properties#指定kafka对应的broker id ,唯一
        broker.id=0
        #指定数据存放的目录
        log.dirs=/opt/install/kafka_2.11-1.1.0/logs
        #指定zk地址
        zookeeper.connect=node01:2181,node02:2181,node03:2181
        #指定是否可以删除topic ,默认是false 表示不可以删除
        delete.topic.enable=true
        #指定broker主机名
        host.name=node01
  • 5、node01执行以下命令分发kafka安装目录到其他节点

    cd /opt/install/
    scp -r kafka_2.11-1.1.0/ node02:$PWD
    scp -r kafka_2.11-1.1.0/ node03:$PWD
    
  • 6、修改node02和node03上的配置

    • node02执行以下命令进行修改配置

      cd /opt/install/kafka_2.11-1.1.0/config/
      vi server.properties#指定kafka对应的broker id ,唯一
      broker.id=1
      #指定数据存放的目录
      log.dirs=/opt/install/kafka_2.11-1.1.0/logs
      #指定zk地址
      zookeeper.connect=node01:2181,node02:2181,node03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=node02
      
    
    
  • node03执行以下命令进行修改配置

    cd /opt/install/kafka_2.11-1.1.0/config/vi server.properties#指定kafka对应的broker id ,唯一broker.id=2#指定数据存放的目录log.dirs=/opt/install/kafka_2.11-1.1.0/logs#指定zk地址zookeeper.connect=node01:2181,node02:2181,node03:2181#指定是否可以删除topic ,默认是false 表示不可以删除delete.topic.enable=true#指定broker主机名host.name=node03
    

3.1、 kafka集群启动和停止

3.1.1、 启动

  • 先启动zk集群

  • 然后在所有节点执行脚本

    cd /opt/install/kafka_2.11-1.1.0/
    nohup bin/kafka-server-start.sh config/server.properties 2>&1 & 
  • 一键启动kafka

    • start_kafka.sh

      #!/bin/sh
      for host in node01 node02 node03
      dossh $host "source /etc/profile;nohup /opt/install/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/install/kafka_2.11-1.1.0/config/server.properties >/dev/null 2>&1 &"echo "$host kafka is running"done
      

3.2.1、 停止

  • 所有节点执行关闭kafka脚本

    cd /opt/install/kafka_2.11-1.1.0/
    bin/kafka-server-stop.sh 
    
  • 一键停止kafka

    • stop_kafka.sh

      #!/bin/sh
      for host in node01 node02 node03
      dossh $host "source /etc/profile;nohup /opt/install/kafka_2.11-1.1.0/bin/kafka-server-stop.sh &" echo "$host kafka is stopping"
      done
      

3.3.1、 一键启动和停止脚本

  • kafkaCluster.sh

    #!/bin/sh
    case $1 in 
    "start"){
    for host in node01 node02 node03 
    dossh $host "source /etc/profile; nohup /opt/install/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/install/kafka_2.11-1.1.0/config/server.properties > /dev/null 2>&1 &"   echo "$host kafka is running..."  
    done  
    };;"stop"){
    for host in node01 node02 node03 
    dossh $host "source /etc/profile; nohup /opt/install/kafka_2.11-1.1.0/bin/kafka-server-stop.sh >/dev/null  2>&1 &"   echo "$host kafka is stopping..."  
    done
    };;
    esac
    
  • 启动

    sh kafkaCluster.sh start
    
  • 停止

    sh kafkaCluster.sh stop
    

4、 kafka的命令行的管理使用

  • 1、创建topic

    • kafka-topics.sh

    • node01执行以下命令创建topic

      cd /opt/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
      
  • 2、查询所有的topic

    • kafka-topics.sh

      cd /opt/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 
      
  • 3、查看topic的描述信息

    • kafka-topics.sh

      cd /opt/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  
      
  • 4、删除topic

    • kafka-topics.sh

      cd /opt/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 
      
  • 5、node01模拟生产者写入数据到topic中

    ​ node01执行以下命令,模拟生产者写入数据到kafka当中去

    cd /opt/install/kafka_2.11-1.1.0/
    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 
    
  • 6、node01模拟消费者拉取topic中的数据

    ​ node02执行以下命令,模拟消费者消费kafka当中的数据

    cd /opt/install/kafka_2.11-1.1.0/
    bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning
    它会把消息的偏移量保存在zk上或者cd /opt/install/kafka_2.11-1.1.0/
    bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
    它会把消息的偏移量保存在kafka集群内置的topic中
    

7、任意kafka服务器执行以下命令可以增加topic分区数

cd /opt/install/kafka_2.11-1.1.0bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

5、kafka的生产者和消费者api代码开发

5.1 生产者代码开发

  • 创建maven工程引入依赖

     <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><!--    <verbal>true</verbal>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
    
  • 代码开发

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerStudy {/*** 通过javaAPI实现向kafka当中生产数据* @param args*/public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");//消息的确认机制props.put("acks", "all");props.put("retries", 0);//缓冲区的大小  //默认32Mprops.put("buffer.memory", 33554432);//批处理数据的大小,每次写入多少数据到topic   //默认16KBprops.put("batch.size", 16384);//可以延长多久发送数据   //默认为0 表示不等待 ,立即发送props.put("linger.ms", 1);props.put("buffer.memory", 33554432);//指定数据序列化和反序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props);for(int i =0;i<100;i++){//既没有指定分区号,也没有数据的key,直接使用轮序的方式将数据发送到各个分区里面去ProducerRecord record = new ProducerRecord("test", "helloworld" + i);producer.send(record);}//关闭消息发送客户端producer.close();}
    }
    

5.2 消费者代码开发

5.2.1、自动提交偏移量代码开发

package com.xichuan.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;//todo:需求:开发kafka消费者代码(自动提交偏移量)
public class KafkaConsumerStudy {public static void main(String[] args) {//准备配置属性Properties props = new Properties();//kafka集群地址props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");//消费者组idprops.put("group.id", "consumer-test");//允许自动提交偏移量props.put("enable.auto.commit", "true");//自动提交偏移量的时间间隔props.put("auto.commit.interval.ms", "1000");//默认是latest//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常props.put("auto.offset.reset","earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//指定消费哪些topicconsumer.subscribe(Arrays.asList("test"));while (true) {//不断的拉取数据ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {//该消息所在的分区号int partition = record.partition();//该消息对应的keyString key = record.key();//该消息对应的偏移量long offset = record.offset();//该消息内容本身String value = record.value();System.out.println("partition:"+partition+"\t key:"+key+"\toffset:"+offset+"\tvalue:"+value);}}}
}

5.2.2、手动提交偏移量代码开发

package com.xichuan.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;//todo:需求:开发kafka消费者代码(手动提交偏移量)
public class KafkaConsumerControllerOffset {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");props.put("group.id", "controllerOffset");//关闭自动提交,改为手动提交偏移量props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//指定消费者要消费的topicconsumer.subscribe(Arrays.asList("test"));//定义一个数字,表示消息达到多少后手动提交偏移量final int minBatchSize = 20; //条  20*16k//定义一个数组,缓冲一批数据List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//100毫秒超时时间for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {//insertIntoDb(buffer);  拿到数据之后,进行消费System.out.println("缓冲区的数据条数:"+buffer.size());System.out.println("我已经处理完这一批数据了...");consumer.commitSync();//手动提交buffer.clear();}}}
}

5.2.3、指定分区数据进行消费

因为每个topic都可能有多个分区,所以我们也可以针对指定的分区进行消费

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class ConsumPartition {public static void main(String[] args) {Properties props= new Properties();//指定kafka的broker的通信地址props.put("bootstrap.servers","localhost:9092"); props.put("group.id", "test");props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);String topic ="foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);//通过assign来注册仅仅消费某些分区的数据consumer.assign(Arrays.asList(partition0, partition1));
//手动指定消费指定分区的数据---endwhile (true) {ConsumerRecords<String,String> records = consumer.poll(100);for(ConsumerRecord<String, String> record : records)System.out.printf("offset= %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());}}
}

6、kafka的分区策略

6.1、 分区的概念

Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份.

对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题。

6.2、 分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略.

public interface Partitioner extends Configurable, Closeable {/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.*/public void close();}
1.2.1 轮训策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

1.2.2 随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

1.2.3 按照消息key保存

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。

1.2.4 基于地理位置的分区策略

这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

1.3 用户自定义分区
kafka的分区策略决定了producer生产者产生的一条消息最后会写入到topic的哪一个分区中
/*** Creates a record with a specified timestamp to be sent to a specified topic and partition* * @param topic The topic the record will be appended to* @param partition The partition to which the record should be sent* @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign*                  the timestamp using System.currentTimeMillis().* @param key The key that will be included in the record* @param value The record contents* @param headers the headers that will be included in the record*/public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null.");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));if (partition != null && partition < 0)throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;this.headers = new RecordHeaders(headers);}
  • 1、指定具体的分区号
//1、给定具体的分区号,数据就会写入到指定的分区中
producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hello-kafka-"+i));
  • 2、不给定具体的分区号,给定key的值(key不断变化)
//2、不给定具体的分区号,给定一个key值, 这里使用key的 hashcode%分区数=分区号
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i));
  • 3、不给定具体的分区号,也不给对应的key
//3、不给定具体的分区号,也不给定对应的key ,这个它会进行轮训的方式把数据写入到不同分区中
producer.send(new ProducerRecord<String, String>("test", "hello-kafka-"+i));
  • 4、自定义分区

    • 定义一个类实现接口Partitioner
    package com.xichuan.partitioner;import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;import java.util.Map;//todo:需求:自定义kafka的分区函数
    public class MyPartitioner implements Partitioner{/*** 通过这个方法来实现消息要去哪一个分区中* @param topic* @param key* @param bytes* @param value* @param bytes1* @param cluster* @return*/public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {//获取topic分区数int partitions = cluster.partitionsForTopic(topic).size();//key.hashCode()可能会出现负数 -1 -2 0 1 2//Math.abs 取绝对值return Math.abs(key.hashCode()% partitions);}public void close() {}public void configure(Map<String, ?> map) {}
    }
    • 配置自定义分区类
    //在Properties对象中添加自定义分区类
    props.put("partitioner.class","com.xichuan.partitioner.MyPartitioner");
    

​ 分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降

7、kafka 消息压缩

压缩就是用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。

7.1 何时压缩

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 开启 GZIP 压缩props.put("compression.type", "gzip");Producer<String, String> producer = new KafkaProducer<>(props);

这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。

 Broker 端也有一个参数叫 compression.type,和上面那个例子中的同名。但是这个参数的默认值是 producer,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。

7.2 何时解压缩

通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。

除了在 Consumer 端解压缩,Broker 端也会进行解压缩。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。我们必须承认这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言。

7.3 各种压缩算法对比

在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

7.4 最佳实践

1、Producer 端完成的压缩,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反2、除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么也建议你开启压缩。如果客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗。3、解压缩端,尽量保证不要出现消息格式转换的情况

8、消费者组

8.1 消费者组的基本概念

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

消费者组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

1、Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
2、Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
3、Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。

Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型(P2P/PubSub):如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

在实际使用场景中,怎么知道一个 Group 下该有多少个 Consumer 实例呢?理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

举个简单的例子,假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C,它们的分区数依次是 1、2、3,那么通常情况下,为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。

针对 Consumer Group,Kafka 是怎么管理位移的呢?

老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。即:__consumer_offsets。

8.2 consumer Group rebalance

8.2.1 rebalance介绍

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。1、组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
2、订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
3、订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

举个简单的例子来说明一下 Consumer Group 发生 Rebalance 的过程。假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

Rebalance 之后的分配依然是公平的,即每个 Consumer 实例都获得了 3 个分区的消费权。这是我们希望出现的情形。

8.2.2 rebalance的问题

1、Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

2、目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

3、Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

相关文章:

kafka详解一

kafka详解一 1、消息引擎背景 根据维基百科的定义&#xff0c;消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息&#xff0c;实现松耦合的异步式数据传递. 即&#xff1a;系统 A 发送消息给消息引擎系统&#xff0c;系统 B 从消息引擎系统中读取 A…...

Flutter yuv 转 rgb

1、引用yuv_converter库 yuv_converter: ^0.0.1 2、导入头文件&#xff1a; import package:yuv_converter/yuv_converter.dart;3、yuv转rgb YuvConverter.yuv420NV21ToRgba8888(yuvRawData, 512, 512) 根据yuv格式选择不同的api。 举个例子&#xff1a; void initState() …...

MySQL——子查询

2023.9.8 相关学习笔记&#xff1a; #子查询 /* 含义&#xff1a; 出现在其他语句中的select语句&#xff0c;称为子查询或内查询 外部的查询语句&#xff0c;称为主查询或外查询分类&#xff1a; 按子查询出现的位置&#xff1a;select后面&#xff1a;仅仅支持标量子查询fro…...

Java学习笔记---多态

面向对象三大特征之一&#xff08;继承&#xff0c;封装&#xff0c;多态&#xff09; 多态的应用场景&#xff1a;根据传递对象的不同&#xff0c;调用不同的show方法 一、多态的定义 同类型的对象&#xff0c;表现出的不同形态&#xff08;对象的多种形态&#xff09; 二…...

2023-09-10 LeetCode每日一题(课程表 II)

2023-09-10每日一题 一、题目编号 210. 课程表 II二、题目链接 点击跳转到题目位置 三、题目描述 现在你总共有 numCourses 门课需要选&#xff0c;记为 0 到 numCourses - 1。给你一个数组 prerequisites &#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示在…...

合并区间【贪心算法】

合并区间 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 class Solution {public int[][] merge(int[…...

2023,软件测试人的未来在哪里?

2023年&#xff0c;IT行业出现空前的萧条&#xff0c;首先是年初一开始各大厂像着了魔似的不约而同的纷纷裁员、降薪、奖金包缩水&#xff0c;随之而来的是需求萎缩&#xff0c;HC减少或封锁等等。 而有幸未被列入裁员名单的在职人员&#xff0c;庆幸之余也心有余悸&#xff0…...

Python中的Numpy向量计算(R与Python系列第三篇)

目录 一、什么是Numpy? 二、如何导入NumPy? 三、生成NumPy数组 3.1利用序列生成 3.2使用特定函数生成NumPy数组 &#xff08;1&#xff09;使用np.arange() &#xff08;2&#xff09;使用np.linspace() 四、NumPy数组的其他常用函数 &#xff08;1&#xff09;np.z…...

LeetCode刷题笔记【27】:贪心算法专题-5(无重叠区间、划分字母区间、合并区间)

文章目录 前置知识435. 无重叠区间题目描述参考<452. 用最少数量的箭引爆气球>, 间接求解直接求"重叠区间数量" 763.划分字母区间题目描述贪心 - 建立"最后一个当前字母"数组优化marker创建的过程 56. 合并区间题目描述解题思路代码① 如果有重合就合…...

nvidia-smi 命令详解

nvidia-smi 命令详解 1. nvidia-smi 面板解析2. 显存与GPU的区别 Reference: nvidia-smi命令详解 相关文章&#xff1a; nvidia-smi nvcc -V 及 CUDA、cuDNN 安装 nvidia-smi(NVIDIA System Management Interface) 是一种命令行实用程序&#xff0c;用于监控和管理 NVIDIA G…...

fork()函数的返回值

在程序中&#xff0c;int pd fork() 是一个典型的 fork() 调用。fork() 函数会创建一个新的进程&#xff0c;然后在父进程中返回子进程的进程ID&#xff08;PID&#xff09;&#xff0c;在子进程中返回0。所以 pd 的值会根据当前进程是父进程还是子进程而有所不同&#xff1a;…...

Stable Diffusion WebUI挂VPN不能跑图解决办法(Windows)

如何解决SD在打开VPN的状态不能运行的问题 在我们开VPN的时候会出现无法生成图片&#xff0c;也无法做其他任何事&#xff0c;这个时候是不是很着急呢&#xff1f; 别急&#xff0c;我这里会说明如何解决。 就像这样&#xff0c;运行半天生成不了图&#xff0c;有时还会出现…...

Android的本地数据

何为本地&#xff0c;即写完之后除非手动修改&#xff0c;否像嘎了一样在那固定死了 有些需求可能也会要求我们去写死数据&#xff0c;因为这需求是一成不变的&#xff0c;那么你通常会用什么方法写死呢&#xff1f; 1. 本地存储-SharedPreferences 此方法可以长时间保存于手…...

android NDK 开发包,网盘下载,不限速

记录下ndk 开发包的地址&#xff0c;分享给大家。 另外有Android studio的下载包&#xff0c; 在另一篇文章 链接&#xff1a;http://t.csdn.cn/JSr9x Android Studio.exe 下载 2023 最新更新&#xff0c;网盘下载_hsj-obj的博客-CSDN博客 主要是19-25&#xff0c;其他的没有…...

【每日一题Day320】LC2651计算列车到站时间 | 数学

计算列车到站时间【LC2651】](https://leetcode.cn/problems/calculate-delayed-arrival-time/) 给你一个正整数 arrivalTime 表示列车正点到站的时间&#xff08;单位&#xff1a;小时&#xff09;&#xff0c;另给你一个正整数 delayedTime 表示列车延误的小时数。 返回列车实…...

C语言柔性数组详解:让你的程序更灵活

柔性数组 一、前言二、柔性数组的用法三、柔性数组的内存分布四、柔性数组的优势五、总结 一、前言 仔细观察下面的代码&#xff0c;有没有看出哪里不对劲&#xff1f; struct S {int i;double d;char c;int arr[]; };还有另外一种写法&#xff1a; struct S {int i;double …...

Redis-带你深入学习数据类型list

目录 1、list列表 2、list相关命令 2.1、添加相关命令&#xff1a;rpush、lpush、linsert 2.2、查找相关命令&#xff1a;lrange、lindex、llen 2.3、删除相关命令&#xff1a;lpop、rpop、lrem、ltrim 2.4、修改相关命令&#xff1a;lset 2.5、阻塞相关命令&#xff1a…...

react拖拽依赖库react-dnd

注&#xff1a;对于表格自定义行可以拖拽和树自定义节点可以拖拽等比较适用&#xff0c;其余的拖拽处理可以使用dragstart&#xff0c;drop等js原生事件来实现 react-dnd使用方法很简单&#xff0c;直接上干货 第一步安装依赖并引入 import { DndProvider } from react-dnd;…...

win10环境安装使用docker-maxwell

目的&#xff1a;maxwell可以监控mysql数据变化&#xff0c;并同步到kafka、mq或tcp等。 maxwell和canal区别&#xff1a; maxwell更轻量&#xff0c;canal把表结构也输出了 docker bootstrap可导出历史数据&#xff0c;canal不能 环境 &#xff1a;win10&#xff0c;mysql5…...

Docker部署RabbitMQ

Docker部署RabbitMQ 介绍 RabbitMQ是一个开源的消息队列系统&#xff0c;它被设计用于在应用程序之间传递消息。它采用了AMQP&#xff08;高级消息队列协议&#xff09;作为底层通信协议&#xff0c;这使得它能够在不同的应用程序之间进行可靠的消息传递。 那么&#xff0c;…...

23个react常见问题

1、setState 是异步还是同步&#xff1f; 合成事件中是异步 钩子函数中的是异步 原生事件中是同步 setTimeout中是同步 相关链接&#xff1a;你真的理解setState吗&#xff1f;&#xff1a; 2、聊聊 react16.4 的生命周期 图片 相关连接&#xff1a;React 生命周期 我对 Reac…...

【python基础】——Anaconda下包更新的坑及安装与卸载、及安装后Jupyter Notebook没反应的解决方法

文章目录 前言一、起因:如何一步步走到卸载重装anaconda?二、卸载anaconda二、重新安装anaconda三、关于安装Anaconda后,打开Jupyter Notebook运行代码没反应且in[ ]没有*前言 本文主要用来记录自己近期踩坑的一些复盘。其中坑有: ‘.supxlabel’ 不起作用的解决pip list 与…...

CSS 中的 display 和 visibility

CSS 中的 display 和 visibility 都可以设置一个元素在浏览器中的显示或隐藏效果。 display: 隐藏某个元素时&#xff0c;不会占用任何空间。换句话讲&#xff0c;不会影响布局。visibility: 隐藏某个元素时&#xff0c;仍需占用与未隐藏之前一样的空间。换句话讲&#xff0c;…...

解决mysql报错this is incompatible with DISTINCT

环境 centos 9 php7.4 mysql5.7 问题 mysql查询报如下错误&#xff1a; SQLSTATE[HY000]: General error: 3065 Expression #1 of ORDER BY clause is not in SELECT list, references column hst_csc.q.timestamp which is not in SELECT list; this is incompatible with…...

C++-map和set

本期我们来学习map和set 目录 关联式容器 键值对 pair 树形结构的关联式容器 set multiset map multimap 关联式容器 我们已经接触过 STL 中的部分容器&#xff0c;比如&#xff1a; vector 、 list 、 deque 、forward_list(C11)等&#xff0c;这些容器统称为序列式…...

微信小程序AI类目-深度合成-AI问答/AI绘画 互联网信息服务算法备案审核通过教程

近期小程序审核规则变化后&#xff0c;很多使用人类小徐提供的chatGPT系统的会员上传小程序无法通过审核&#xff0c;一直提示需要增加深度合成-AI问答、深度合成-AI绘画类目&#xff0c;该类目需要提供互联网信息服务算法备案并上传资质&#xff0c;一般对企业来说这种务很难实…...

蓝桥杯官网练习题(星期一)

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 整个 2020 世纪&#xff08;1901 年 1 月 1 日至 2000 年 12 月 3131 日之间&#xff09;&#xff0c;一共有多少个星期一&#xff1f;(不要告诉我你不知道今天是星…...

centos7更新podman

实验环境&#xff1a;centos7.7.1908 1.安装podman并查看版本 yum install podman podman -v 当前podman版本信息是1.6.4 2.更新podman版本 通过查看资料显示centos 7 支持最高版本为 3.4.4&#xff0c;更新podman大致有以下四步&#xff1a; golang 安装(本次使用版本: 1.…...

Java特性之设计模式【抽象工厂模式】

一、抽象工厂模式 概述 抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是围绕一个超级工厂创建其他工厂。该超级工厂又称为其他工厂的工厂。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式 在抽象工厂模式中&#xff0c;接口是…...

机器学习简介

引言 为何现在机器学习如此热门&#xff1f; 主要原因是由于“人类无论如何也做不到在短时间内实现从大量的数据中自动的计算出正确的结果操作”。 什么是机器学习&#xff1f; 所谓的机器学习&#xff0c;就是通过对数据进行反复的学习&#xff0c;来找出其中潜藏的规律和模式…...

如何自学建网站/学生制作个人网站

交换机中line-rate用于端口限速&#xff0c;主要用于出端口上&#xff1b;traffic-limit用于流限速&#xff0c;主要用于入端口上。由于其实现机制原因可能导致一些软件测速工具&#xff08;如ftp和chariot等&#xff09;测试交换机traffic-limit时数据不准确&#xff0c;而测试…...

html5手机网站开发工具/seo优化招聘

为什么80%的码农都做不了架构师&#xff1f;>>> 最近在开发中碰到当手机没有网络的时候&#xff0c;WebView加载本地缓存出了问题&#xff0c;界面变得很乱&#xff0c;初步断定是样式表没有加载上来。 WebView的缓存策略是这样的&#xff1a;webSettings.setCache…...

大恒建设集团有限公司网站/有创意的网络广告案例

查找前n个连续匹配值 返回[_First,_Last)区间内连续_Count_raw个元素造成_Pred(elem,val)为true的第一元素位置 返回[_First,_Last)区间内连续_Count个元素值都等于value中的第一元素位置 如果没有找到匹配元素&#xff0c;返回end 复杂度&#xff1a;线性 最多比较numElems …...

wordpress5.0.2编辑器/seo优化顾问服务

最近遇到一个问题&#xff1a;在使用ehcache时&#xff0c;通过CacheManager.getCache(chachename).get(key)&#xff0c;获取相应的缓存内对象&#xff08;当时这个对象是个list&#xff09;&#xff0c; 有个同事写个方法使用 removeall 来删除list中不需要的对象&#xff0c…...

做网站的动态图片/百度快照怎么弄

此问题已解决。请看&#xff1a; IOS APP 国际化 程序内切换语言实现 不重新启动系统&#xff08;完美解决方案&#xff09; 接了个变态的需求,要在程序内切换程序语言实现国际化。 可以先看看这个&#xff0c;比较详细。 http://blog.csdn.net/xwren362922604/article/detail…...

cms适合做什么网站/全球新冠疫情最新消息

目录1.适配器1.什么是适配器2.缺省值3.当第二个参数是一些别的容器的时候4.简单介绍deque5.代码&#xff1a;1.适配器 1.什么是适配器 适配器是一种设计模式(设计模式是一套被反复使用的、多数人知晓的、经过分类编目的、代码设计经验的总结)&#xff0c;该种模式是将一个类的…...