当前位置: 首页 > news >正文

男朋友抱着我在教室做网站/百度权重提升

男朋友抱着我在教室做网站,百度权重提升,建设工程网教育网官网,自建网站有哪些springboot 连接 kafka集群 一、环境搭建1.1 springboot 环境1.2 kafka 依赖 二、 kafka 配置类2.1 发布者2.1.1 配置2.1.2 构建发布者类2.1.3 发布消息 2.2 消费者2.2.1 配置2.2.2 构建消费者类2.2.3 进行消息消费 一、环境搭建 1.1 springboot 环境 JDK 11 Maven 3.8.x spr…

springboot 连接 kafka集群

  • 一、环境搭建
    • 1.1 springboot 环境
    • 1.2 kafka 依赖
  • 二、 kafka 配置类
    • 2.1 发布者
      • 2.1.1 配置
        • 2.1.2 构建发布者类
        • 2.1.3 发布消息
    • 2.2 消费者
      • 2.2.1 配置
      • 2.2.2 构建消费者类
      • 2.2.3 进行消息消费

一、环境搭建

1.1 springboot 环境

JDK 11+
Maven 3.8.x+
springboot 2.5.4 +

1.2 kafka 依赖

springboot的pom文件导入

       <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

二、 kafka 配置类

2.1 发布者

2.1.1 配置

发布者我们使用 KafkaTemplate 来进行消息发布,所以需要先对其进行一些必要的配置。

@Configuration
@EnableKafka
public class KafkaConfig {/***** 发布者 *****///生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

2.1.2 构建发布者类

配置完发布者,下来就是发布消息,我们需要继承 ProducerListener<K, V> 接口,该接口完整信息如下:

public interface ProducerListener<K, V> {void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,Exception exception);}

实现该接口的方法,我们可以获取包含发送结果(成功或失败)的异步回调,也就是可以在这个接口的实现中获取发送结果。

我们简单的实现构建一个发布者类,接收主题和发布消息参数,并打印发布结果。

@Component
public class KafkaProducer implements ProducerListener<Object,Object> {private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class);private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void producer (String msg,String topic){ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg);future.addCallback(new KafkaSendCallback<Integer, String>() {@Overridepublic void onSuccess(SendResult<Integer, String> result) {producerlog.info("发送成功 {}", result);}@Overridepublic void onFailure(KafkaProducerException ex) {ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();producerlog.info("发送失败 {}",failed);}});}}

2.1.3 发布消息

写一个controller类来测试我们构建的发布者类,这个类中打印接收到的消息,来确保信息接收不出问题。

@RestController
public class KafkaTestController {private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class);@Resourceprivate KafkaProducer kafkaProducer;@GetMapping("/kafkaTest")public void kafkaTest(String msg,String topic){kafkaProducer.producer(msg,topic);kafkaTestLog.info("接收到消息 {} {}",msg,topic);}
}

一切准备就绪,我们启动程序利用postman来进行简单的测试。

进行消息发布:
在这里插入图片描述

发布结果:
在这里插入图片描述
可以看到消息发送成功。

我们再看看kafka消费者有没有接收到消息:

在这里插入图片描述

看以看到,kakfa的消费者也接收到了消息。

2.2 消费者

2.2.1 配置

消息的接受有多种方式,我们这里选择的是使用 @KafkaListener 注解来进行消息接收。它的使用像下面这样:

public class Listener {@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")public void listen(String data) {...}}

看起来不是太难吧,但使用这个注解,我们需要配置底层 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。

我们在原来的kafka配置类 KafkaConfig 中,继续配置消费者,大概就像下面这样

@Configuration
@EnableKafka
public class KafkaConfig {/***** 发布者 *****///生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}//生产者模板@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/***** 消费者 *****///容器监听工厂@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}//消费者工厂@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}//消费者配置@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);return props;}
}

注意,要设置容器属性必须使用getContainerProperties()工厂方法。它用作注入容器的实际属性的模板

2.2.2 构建消费者类

配置好后,我们就可以使用这个注解了。这个注解的使用有多种方式:

1、用它来覆盖容器工厂的concurrency和属性

@KafkaListener(id = "myListener", topics = "myTopic",autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {...
}

2、可以使用显式主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

3、将初始偏移应用于所有已分配的分区

@KafkaListener(id = "thing3", topicPartitions ={ @TopicPartition(topic = "topic1", partitions = { "0", "1" },partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))})
public void listen(ConsumerRecord<?, ?> record) {...
}

4、指定以逗号分隔的分区列表或分区范围

@KafkaListener(id = "pp", autoStartup = "false",topicPartitions = @TopicPartition(topic = "topic1",partitions = "0-5, 7, 10-15"))
public void process(String in) {...
}

5、可以向侦听器提供Acknowledgment

@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {...ack.acknowledge();
}

6、添加标头

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.OFFSET) List<Long> offsets) {...
}

我们这里写一个简单的,只用它来接受指定主题的数据:

@Component
public class KafkaConsumer {private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topicPartitions  = @TopicPartition(topic = "kafka-topic-test",partitions = "0"))public void consumer (String data){consumerlog.info("消费者接收数据 {}",data);}
}

这里解释一下,因为我们进行了手动分配主题/分区,所以 注解中group.id 可以为空。若要指定group.id请在消费者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “组id”

2.2.3 进行消息消费

继续使用postman调用我们写好的发布者发布消息,观察控制台的消费者类是否有相关日志出现。
在这里插入图片描述

相关文章:

springboot 连接 kafka集群(kafka版本 2.13-3.4.0)

springboot 连接 kafka集群 一、环境搭建1.1 springboot 环境1.2 kafka 依赖 二、 kafka 配置类2.1 发布者2.1.1 配置2.1.2 构建发布者类2.1.3 发布消息 2.2 消费者2.2.1 配置2.2.2 构建消费者类2.2.3 进行消息消费 一、环境搭建 1.1 springboot 环境 JDK 11 Maven 3.8.x spr…...

Nacos配置中心使用(Spring Cloud版)

目标 向项目中集成Nacos配置。原项目是一个SpringBoot项目。这里假设我们无法修改原有项目的SpringBoot版本。 注意 在不动SpringBoot版本的前提下&#xff0c;根据SpringBoot的版本&#xff0c;确定Spring Cloud和Nacos版本。Nacos版本其实就是Spring Cloud Alibaba版本。在…...

STM32F407硬件I2C实现MPU6050通讯(CUBEIDE)

STM32F407硬件I2C实现MPU6050通讯 文章目录 STM32F407硬件I2C实现MPU6050通讯cubeide设置写操作与读操作函数实现复位&#xff0c;读取温度&#xff0c;角度等函数封装mpu6050.cmpu6050.h代码分析 DMP移植1.修改头文件路径为自己的头文件路径2.修改I2C读写函数为自己mcu平台的读…...

HTML5 语义元素(一)页面结构

本篇主要介绍HTML5增加的语义元素中关于页面结构方面的&#xff0c;包含&#xff1a; <article>、<aside>、<figure>、<figcaption>、<footer>、<header>、<main>、<nav>、<section>等元素。 目录 1. 语义元素介绍 1.…...

嵌套滚动实践:onInterceptTouchEvent与NestedScrolling【实用为准】

嵌套滚动&#xff1a;内外两层均可滚动&#xff0c;比如上半部分是一个有限的列表&#xff0c;下半部分是WebView&#xff0c;在内层上半部分展示到底的时候&#xff0c;外部父布局整体滚动内部View&#xff0c;将底部WevView拉起来&#xff0c;滚动到顶部之后再将滚动交给内部…...

Redis入门 - 5种基本数据类型

原文首更地址&#xff0c;阅读效果更佳&#xff01; Redis入门 - 5种基本数据类型 | CoderMast编程桅杆https://www.codermast.com/database/redis/five-base-datatype.html 说明 在我们平常的业务中基本只会使用到Redis的基本数据类型&#xff08;String、List、Hash、Set、…...

mybatis-plus用法(一)

MyBatis-plus 是一款 Mybatis 增强工具&#xff0c;用于简化开发&#xff0c;提高效率。下文使用缩写 mp来简化表示 MyBatis-plus&#xff0c;本文主要介绍 mp 整合 Spring Boot 的使用。 (5条消息) mybatis-plus用法&#xff08;二&#xff09;_渣娃工程师的博客-CSDN博客 1…...

源码安装包管理

1. 源码包基本概述 在linux环境下面安装源码包是比较常见的, 早期运维管理工作中&#xff0c;大部分软件都是通过源码安装的。那么安装一个源码包&#xff0c;是需要我们自己把源代码编译成二进制的可执行文件。 源码包的编译用到了linux系统里的编译器&#xff0c;通常源码包…...

Vue|获取表单数据

在Vue中获取表单数据有多种方式&#xff0c;具体取决于你使用的是哪种表单元素和你的需求。 1. 单个表单元素&#xff1a; 如果你只需要获取单个表单元素的值&#xff0c;可以使用v-model指令将表单元素的值绑定到Vue实例的一个属性上。例如&#xff1a; <input type&quo…...

微信小程序入门学习02-TDesign中的自定义组件

目录 1 显示文本2 自定义组件3 变量定义4 值绑定总结 我们上一篇讲解了TDesign模板的基本用法&#xff0c;如何开始阅读模板。本篇我们讲解一下自定义组件的用法。 1 显示文本 官方模板在顶部除了显示图片外&#xff0c;还显示了一段文字介绍。文字是嵌套在容器组件里&#xf…...

【linux kernel】linux media子系统分析之media控制器设备

文章目录 一、抽象媒体设备模型二、媒体设备三、Entity四、Interfaces五、Pad六、Link七、Media图遍历八、使用计数和电源处理九、link设置十、Pipeline和Media流十一、链接验证十二、媒体控制器设备的分配器API 本文基于linux内核 4.19.4&#xff0c;抽象媒体设备模型框架的相…...

Scala--03

第6章 面向对象 Scala 的面向对象思想和Java 的面向对象思想和概念是一致的。 Scala 中语法和 Java 不同&#xff0c;补充了更多的功能。 6.1类和对象详解 6.1.1组成结构 构造函数: 在创建对象的时候给属性赋值 成员变量: 成员方法(函数) 局部变量 代码块 6.1.2构造器…...

【MongoDB】--MongoDB高级功能

目录 一、前言二、聚合管道aggregate1、示例说明2、具体代码实现一、前言 这里主要记录mongodb一些高级功能使用,如聚合。 二、聚合管道aggregate 聚合操作将来自多个文档的值组合在一起,并且可以对分组数据执行各种操作以返回单个结果,主要用于处理数据(诸如统计平均值,…...

C# new与malloc

目录 C# new与malloc C# new与malloc的区别 C# new关键字底层做的操作 C# new与malloc new关键字&#xff1a; new关键字在C#中用于实例化对象&#xff0c;并为其分配内存。它是面向对象编程的基本操作之一。使用new关键字可以在托管堆上分配内存&#xff0c;同时调用对象的构…...

微软MFC技术简明介绍

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天来看一下微软MFC技术简明介绍 Visual C 与 MFC 微软公司于1992年上半年推出了C/C 7.0 产品时初次向世人介绍了MFC 1.0&#xff0c;这个产品包含了20,000行C原始代码&#xff0c;60个以上的Windows相关类…...

汽车电子Autosar之车载以太网

前言 近些年来&#xff0c;随着为了让汽车更加安全、智能、环保等&#xff0c;一系列的高级辅助驾驶功能喷涌而出。未来满足这些需求&#xff0c;就对传统的电子电器架构带来了严峻的考验&#xff0c;需要越来越多的电子部件参与信息交互&#xff0c;导致对网络传输速率&#x…...

MSP430_C语言例程注释详

本章选择了一些简单的C语言程序例题&#xff0c;这些程序的结构简单&#xff0c;编程技巧不多&#xff0c;题目虽然 简单&#xff0c;但是非常适合入门单片机的学习者学习MSP430单片机的C 语言编程。 如下列出了C语言例题运行的MSP430F149实验板硬件资源环境&#xff0c;熟悉…...

Vb+access库存管理系统(论文+开题报告+源代码+目录)

库存信息管理系统的基本问题1.1 库存信息管理系统的简介 本系统是为了提高腾达公司自动化办公的水平、经过详细的调查分析初步制定了腾达公司库存信息管理系统。基于WINDOWS 98 平台,使用Microsoft Access97, 在Visual Basic 6.0编程环境下开发的库存信息管理系统。该系统采用…...

Java 数组

在 Java 语言中&#xff0c;数组是一种基本的数据结构&#xff0c;可以存储一组相同类型的数据。本篇技术博客将详细介绍 Java 语言中的数组&#xff0c;包括一维数组和多维数组&#xff0c;以及数组的使用方法和注意事项。 一维数组 一维数组是指只有一行的数组&#xff0c;…...

CSDN 编程竞赛五十八期题解

竞赛总览 CSDN 编程竞赛五十八期&#xff1a;比赛详情 (csdn.net) 竞赛题解 题目1、打家劫舍 有一个小偷计划偷窃沿街的房屋&#xff0c;每间房内都藏有一定的现金&#xff0c;影响偷窃行为的唯一制约因素就是相邻的房屋装有相互连通的防盗系统。如果两间相邻的房屋在同一晚…...

Unity入门6——光源组件

一、参数面板 二、参数介绍 Type&#xff1a;光源类型 Spot&#xff1a;聚光灯 Range&#xff1a;发光距离Spot Angle&#xff1a;光锥角度Directional&#xff1a;方向光Point&#xff1a;点光源Area&#xff08;Baked Only&#xff09;&#xff1a;面光源 仅烘焙。预先算好&…...

C语言之动态内存分配(1)

目录 本章重点 为什么存在动态内存分配 动态内存函数的介绍 malloc free calloc realloc 常见的动态内存错误 几个经典的笔试题 柔性数组 动态内存管理—自己维护自己的内存空间的大小 首先我们申请一个变量&#xff0c;再申请一个数组 这是我们目前知道的向内存申请…...

AIGC新时代,注意政策走向,产业方向,拥抱可信AI。需要了解基本理论,基础模型,前沿进展,产品应用,以及小小的项目复现

AIGC&#xff08;AI-Generated Content&#xff0c;AI生成内容&#xff09;是指基于生成对抗网络&#xff08;GAN&#xff09;、大型预训练模型等人工智能技术的方法&#xff0c;通过对已有数据进行学习和模式识别&#xff0c;以适当的泛化能力生成相关内容的技术。类似的概念还…...

如何白嫖一年CSDN会员?618活动!亲测有效!!!

活动详情 CSDN会员免费送一年&#xff0c;仅剩3天&#xff01; 下载权益延长一年&#xff01; 一年一次的机会&#xff0c;错过了就要再等明年&#xff01; 博主已经领取到了&#xff01; 会员权益 1、修改专属域名&#xff0c;别人都是https://blog.csdn.net/qq_xxxxxxxx&a…...

微服务: 00-rabbitmq出现的异常以及解决方案

目录 前言: 问题概述: 1. rabbitmq初始安装配置异常 -> 1.1 rabbitmq报您与此网站连接不是私密连接 --->1.1.1 上述问题解决方案 ---> 1.1.2 依次执行下面代码 -> 1.2 解决用户的No access情况 -> 1.2.1 使用设置的账号密码进行登录 -> 1.2.2 点击 Ad…...

Vue3与Vue2比较

Vue.js 3相对于Vue.js 2带来了一些重大变化&#xff0c;其中包括一些语法变化。 下面是Vue.js 2和Vue.js 3的一些语法差异比较&#xff1a; 一、语法差异比较 1.组件的注册方式不同 在Vue.js 2中&#xff0c;我们使用Vue.component()或者Vue.extend()方式创建一个组件。但是…...

如何对待工作中的失误?

在日复一日的工作中&#xff0c;我们免不了会产生一些失误&#xff0c;会因此感到沮丧和失望。但如何正确地对待和处理这些失误才是最重要的&#xff0c;它直接影响到我们的工作表现和个人成长。一起来谈谈作为职场人的你时如何处理工作中的失误的吧&#xff01; 一、在面对失…...

使用css3如何实现一个文字打印效果

前言 在很多网站首页介绍页里,为了吸引用户,暂留更长时间,使用了一些css3动画的 示例效果 文字打印.gif 实现这个动画原理 想要实现这个动画,改变元素的宽度,结合动画css3关键帧实现 具体代码如下所示 <!DOCTYPE html> <html lang"en"><head><m…...

【雕爷学编程】Arduino动手做(115)---HB100多普勒雷达模块

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…...

深度学习笔记之Transformer(一)注意力机制基本介绍

深度学习笔记之Transformer——注意力机制基本介绍 引言回顾&#xff1a; Seq2seq \text{Seq2seq} Seq2seq模型中的注意力机制注意力机制的简单描述注意力机制的机器学习范例&#xff1a; Nadaraya-Watson \text{Nadaraya-Watson} Nadaraya-Watson核回归 Nadaraya-Watson \text…...