当前位置: 首页 > news >正文

【rabbitmq】实现问答消息消费示例

目录

          • 1. 说明
          • 2. 截图
            • 2.1 接口调用截图
            • 2.2 项目结构截图
          • 3. 代码示例

1. 说明
  • 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
  • 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
  • 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
  • 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
  • 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图

在这里插入图片描述

2.2 项目结构截图

在这里插入图片描述

3. 代码示例
  • 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><groupId>com.learning</groupId><artifactId>springboot</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><!--打jar包使用--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
  • 2.application.yaml
spring:# rabbbitmq配置信息rabbitmq:host: 192.168.2.11port: 5672username: adminpassword: adminvirtual-host: /
  • 3.rabbitmq配置类
package com.learning.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** rabbitmq配置类*/
@Configuration  
public class RabbitMQConfig{/*** 存sseEmitter*/@Bean("emitterMap")public ConcurrentMap<String, SseEmitter> emitterMap(){ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();return emitters;}/*** 工作模式,交换机名*/public static final String EXCHANGE_NAME = "work_exchange";/*** 工作模式,队列名*/public static final String QUEUE_NAME = "work_queue";  @Bean("work_queue")public Queue queue() {  return new Queue(QUEUE_NAME, true);  }  @Bean("work_exchange")public Exchange exchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();}  @Bean  public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();}  
}
  • 4.controller类
package com.learning.controller;import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description 获取答案**/
@RestController
@RequestMapping("/question")
public class QuestionController {@Autowiredprivate QuestionService questionService;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter ask(@RequestParam String question) {String questionId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter();emitterMap.put(questionId, emitter);questionService.ask(questionId, question);return emitter;}
}
  • 5.消息实体
package com.learning.dto;import lombok.Data;import java.io.Serializable;/*** @Author wangyouhui* @Description 消息**/
@Data
public class MessageDTO implements Serializable {private String questionId;private String question;private String answer;private Boolean end;
}
  • 6.service实现类
package com.learning.service.impl;import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description**/
@Service
public class QuestionServiceImpl implements QuestionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")public void receiveMessage(Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String json = new String(message.getBody());ObjectMapper mapper = new ObjectMapper();MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());if(sseEmitter != null){sseEmitter.send(messageDTO);}if(messageDTO.getEnd() != null && messageDTO.getEnd()){sseEmitter.complete();emitterMap.remove(messageDTO.getQuestionId());}// 手动签收channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();// 拒绝签收,消息重新入队try {channel.basicReject(deliveryTag, true);} catch (IOException ioException) {ioException.printStackTrace();}}}@Overridepublic void ask(String questionId, String question) {MessageDTO message1 = new MessageDTO();message1.setQuestionId(questionId);message1.setQuestion(question);message1.setAnswer("您好,这个");message1.setEnd(false);this.sendMessage(message1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message2 = new MessageDTO();message2.setQuestionId(questionId);message2.setQuestion(question);message2.setAnswer("您好,这个是答案");message2.setEnd(false);this.sendMessage(message2);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message3 = new MessageDTO();message3.setQuestionId(questionId);message3.setQuestion(question);message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");message3.setEnd(true);this.sendMessage(message3);}public void sendMessage(MessageDTO message){ObjectMapper mapper = new ObjectMapper();String json = null;try {json = mapper.writeValueAsString(message);rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);} catch (Exception e) {e.printStackTrace();}}
}
  • 7.service接口
package com.learning.service;/*** @Author wangyouhui* @Description **/
public interface QuestionService {void ask(String questionId, String question);
}
  • 8.应用类
package com.learning;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author wangyouhui* @Description**/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

相关文章:

【rabbitmq】实现问答消息消费示例

目录 1. 说明2. 截图2.1 接口调用截图2.2 项目结构截图 3. 代码示例 1. 说明 1.实现的是一个简单的sse接口&#xff0c;单向的长连接&#xff0c;后端可以向前端不断输出数据。2.通过调用sse接口&#xff0c;触发rabbitmq向队列塞消息&#xff0c;向前端返回一个sseEmitter对象…...

单片机_RTOS__架构概念

经典单片机程序 void main() {while(1){函数1&#xff08;&#xff09;&#xff1b;函数2&#xff08;&#xff09;&#xff1b;}} 有无RTOS区别 裸机 RTOS RTOS程序 喂饭&#xff08;&#xff09; {while&#xff08;1&#xff09;{喂一口饭&#xff08;&#xff09;;} } …...

ClickHouse在百度MEG数据中台的落地和优化

导读 百度MEG上一代大数据产品存在平台分散、质量不均和易用性差等问题&#xff0c;导致开发效率低下、学习成本高&#xff0c;业务需求响应迟缓。为了解决这些问题&#xff0c;百度MEG内部开发了图灵3.0生态系统&#xff0c;包括Turing Data Engine(TDE)计算引擎、Turing Dat…...

B/S架构(Browser/Server)与C/S架构(Client/Server)

基本概念 B/S架构&#xff08;Browser/Server&#xff09;&#xff1a;即浏览器/服务器架构。在这种架构中&#xff0c;用户通过浏览器&#xff08;如Chrome、Firefox、Safari等&#xff09;访问服务器上的应用程序。服务器端负责处理业务逻辑、存储数据等核心功能&#xff0c;…...

idea中自定义注释模板语法

文章目录 idea 自定义模板语法1.自定义模板语法是什么&#xff1f;2.如何在idea中设置呢&#xff1f; idea 自定义模板语法 1.自定义模板语法是什么&#xff1f; 打开我的idea&#xff0c;创建一个测试类&#xff1a; 这里看到我的 test 测试类里面会有注释&#xff0c;这是怎…...

基于SSM的儿童教育网站【附源码】

基于SpringBoot的课程作业管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概述 4.2 系统模块设计 4.3.3 数据库表设计 5 系统实现 5.1 管理员功能模块的实现 5.1.1 视频列表 5.1.2 文章信息管理 5.1.3 文章类…...

深挖自闭症病因与孩子表现的关联

自闭症&#xff0c;亦称为孤独症&#xff0c;乃是一种对儿童发展有着严重影响的神经发育障碍性疾病。深入探寻自闭症的病因与孩子表现之间的联系&#xff0c;对于更深刻地理解并助力自闭症儿童而言&#xff0c;可谓至关重要。 当前&#xff0c;自闭症的病因尚未完全明晰&#x…...

[网络协议篇] UDP协议

文章目录 1. 简介2. 特点3. UDP数据报结构4. 基于UDP的应用层协议5. UDP安全性问题6. 使用udp传输数据的系统就一定不可靠吗&#xff1f;7. 基于UDP的主机探活 python实现 1. 简介 User Datagram Protocol&#xff0c;用户数据报协议&#xff0c;基于IP协议提供面向无连接的网…...

关系型数据库(1)----MySQL(初阶)

目录 1.mysql 2.mysqld 3.mysql架构 1.连接层 2.核心服务层 3.存储引擎层 4.数据存储层 4.SQL分类 5.MySQL操作库 6.MySQL数据类型 1. 数值类型 2. 日期和时间类型 3. 字符串类型 4. 空间类型 5. JSON数据类型 7.MySQL表的约束 1. 主键约束&#xff08;PRIMARY…...

计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 用到的技术: 1. python…...

深度学习技术演进:从 CNN、RNN 到 Transformer 的发展与原理解析

深度学习的技术演进经历了从卷积神经网络&#xff08;CNN&#xff09;到循环神经网络&#xff08;RNN&#xff09;再到 Transformer 的重要发展。这三个架构分别擅长处理图像、序列数据和多种任务的特征&#xff0c;标志着深度学习在不同领域取得的进步。 1. 卷积神经网络&…...

Lua中的goto语句

软考鸭微信小程序 过软考,来软考鸭! 提供软考免费软考讲解视频、题库、软考试题、软考模考、软考查分、软考咨询等服务 在Lua编程语言中&#xff0c;goto语句是一种跳转语句&#xff0c;用于将程序的执行流程无条件地转移到程序中的另一个位置。这个位置由一个标签&#xff08;…...

【rust实战】rust博客系统2_使用wrap启动rust项目服务

如何创建一个使用warp框架的rust项目1.使用cargo 创建项目 cargo new blog 2.添加warp依赖 1.cd blog 2.编辑Cargo.toml文件 添加warp 和 tokio 作为依赖项 在[dependencies]中添加 [package] name "blog" version "0.1.0" …...

【实战案例】Django框架使用模板渲染视图页面及异常处理

本文基于之前内容列表如下&#xff1a; 【图文指引】5分钟搭建Django轻量级框架服务 【实战案例】Django框架基础之上编写第一个Django应用之基本请求和响应 【实战案例】Django框架连接并操作数据库MySQL相关API 视图概述 Django中的视图的概念是一类具有相同功能和模板的网…...

设置K8s管理节点异常容忍时间

说明 每个节点上的 kubelet 需要定时向 apiserver 上报当前节点状态&#xff0c;如果两者间网络异常导致心跳终端&#xff0c;kube-controller-manager 中的 NodeController 会将该节点标记为 Unknown 或 Unhealthy&#xff0c;持续一段时间异常状态后 kube-controller-manage…...

什么样的JSON编辑器才好用

简介 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也便于机器解析和生成。随着互联网和应用程序的快速发展&#xff0c;JSON已经成为数据传输和存储的主要格式之一。在处理和编辑JSON数据…...

ArkUI自定义TabBar组件

在ArkUI中的Tabs&#xff0c;通过页签进行内容视图切换的容器组件&#xff0c;每个页签对应一个内容视图。其中内容是图TabContent作为Tabs的自组件&#xff0c;通过给TabContent设置tabBar属性来自定义导航栏样式。现在我们就根据UI设计的效果图来实现下图效果&#xff1a; 根…...

pair类型应用举例

在main.cpp里输入程序如下&#xff1a; #include <iostream> //使能cin(),cout(); #include <utility> //使能pair数据类型; #include <string> //使能string字符串; #include <stdlib.h> //使能exit(); //pair类型可以将两个相同的或不同类…...

数字 图像处理算法的形式

一 基本功能形式 按图像处理的输出形式&#xff0c;图像处理的基本功能可分为三种形式。 1&#xff09;单幅图像 单幅图像 2&#xff09;多幅图像 单幅图像 3&#xff09;单&#xff08;或多&#xff09;幅图像 数字或符号等 二 几种具体算法形式 1.局部处理邻域对于任一…...

安徽对口高考Python试题选:输入一个正整数,然后输出该整数的3的幂数相加形式。

第一步&#xff1a;求出3的最高次幂是多少 guoint(input("请输入一个正整数:")) iguo a0 while i>0: if 3**i<guo: ai break ii-1print(a)#此语句为了看懂题目&#xff0c;题目中不需要打印出最高幂数 第二步…...

Node.js是什么? 能做什么?

‌Node.js是一个基于Chrome V8引擎的JavaScript运行环境&#xff0c;它使用事件驱动、非阻塞式I/O模型&#xff0c;使得JavaScript能够在服务器端运行。Node.js允许JavaScript脱离浏览器&#xff0c;直接在服务器和计算机上使用&#xff0c;极大地扩展了JavaScript的应用范围。…...

JVM快速入门

1、 JVM探究 面试问题 :谈谈对JVM的理解? java8虚拟机和之前的变化更新?什么是OOM,什么是栈溢出StackOverFlowError?怎么分析?JVM的常用调优参数有哪些?内存快照如何抓取,怎么分析Dump文件?知道吗?谈谈JVM中,类加载器你的认识?2、JVM的位置 3、JVM的体系结构 3.1…...

理解深度学习模型——高级音频特征表示的分层理解

理解深度学习模型可以是一个复杂的过程&#xff0c;因为这些模型通常包含大量的参数和层次。 &#xff08;1&#xff09;复杂性来源&#xff1a; 深度学习模型的复杂性来源于多个方面&#xff0c;包括模型的规模、层次结构、参数数量以及训练数据的复杂性。以下是一些关键点&a…...

【HarmonyOS Next】原生沉浸式界面

背景 在实际项目中&#xff0c;为了软件使用整体色调看起来统一&#xff0c;一般顶部和底部的颜色需要铺满整个手机屏幕。因此&#xff0c;这篇帖子是介绍设置的方法&#xff0c;也是应用沉浸式效果。如下图&#xff1a;底部的绿色延伸到上面的状态栏和下面的导航栏 UI 在鸿蒙…...

数据结构 ——— 树的概念及结构

目录 树的结构以及示意图 树的概念​编辑 树的结构与递归的关系​编辑 树的结构以及示意图 树是一种非线性的数据结构&#xff0c;它是由 n(n>0) 个有限节点组成一个具有层次关系的集合 把这种结构叫做树是因为它看起来像一棵倒挂的树 特点&#xff1a; 有一个特殊的…...

初探Vue前端框架

文章目录 简介什么是Vue概述优势MVVM框架 Vue的特性数据驱动视图双向数据绑定指令插件 Vue的版本版本概述新版本Vue 3Vue 3新特性UI组件库UI组件库概述常用UI组件库 安装Vue安装Vue查看Vue版本 实例利用Vue命令创建Vue项目切换工作目录安装vue-cli脚手架创建Vue项目启动Vue项目…...

Lucas带你手撕机器学习——岭回归

岭回归&#xff08;Ridge Regression&#xff09; 一、背景与引入 在进行线性回归分析时&#xff0c;我们常常面临多重共线性的问题。多重共线性指的是自变量之间高度相关&#xff0c;这会导致回归系数的不稳定性&#xff0c;使得模型的预测能力降低。传统的线性回归通过最小…...

C2W4.LAB.Word_Embedding.Part1

理论课&#xff1a;C2W4.Word Embeddings with Neural Networks 文章目录 Word Embeddings First Steps: Data PreparationCleaning and tokenizationSliding window of wordsTransforming words into vectors for the training setMapping words to indices and indices to w…...

hive初体验

1.首先&#xff0c;确保启动了Metastore服务。 runjar就是metastore进程 2.进入hive客户端: 命令:hive 3.操作:没有指定数据库时默认在default 一:创建表:CREATE TABLE test(id INT, name STRING, gender STRING); 完成,show tables看一下 也可以通过hdfs文件系统查看,默认路径…...

云渲染主要是分布式(分机)渲染,如何使用blender云渲染呢?

云渲染主要是分布式&#xff08;分机&#xff09;渲染&#xff0c;比如一个镜头同时开20-100张3090显卡的机器渲染&#xff0c;就能同时渲染20-100帧&#xff0c;渲染不仅不占用自己电脑&#xff0c;效率也将增加几十上百倍&#xff01; blender使用教程如下&#xff1a; 第一…...

营销型网站建设教程/新媒体运营培训

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2021年低压电工答案解析及低压电工实操考试视频&#xff0c;包含低压电工答案解析答案和解析及低压电工实操考试视频练习。由安全生产模拟考试一点通公众号结合国家低压电工考试最新大纲及低压电工考试真题汇总&#…...

网站建设的好处/信息流推广渠道

dp[i][j]dp[i-a[i][j]dp[i-a[i]][j-1]; 其中前一项j不变表示j这一项无限取转载于:https://www.cnblogs.com/linkzijun/p/6288364.html...

影视网站设计/汕头网站关键词推广

wellcms伪静态设置.为了保证更强劲的性能和节省流量&#xff0c;独立安装版自动对php文件和html进行压缩&#xff0c;考虑到js或jquery每个人在写代码的时候使用注释方式的不同&#xff0c;所以html默认压缩了除js以外的全部代码。如需开启压缩模式&#xff0c;请务必卸载非wel…...

中小型企业网站建设的资金流动/云搜索网页版入口

凡函数中以日期作为参数因子的&#xff0c;其中日期的形式都必须是 yyyy/mm/dd。而且必须用英文环境下双引号(" ")引用。1. DATEDATE(year,month,day)&#xff1a;返回一个表示某一特定日期的系列数。Year&#xff1a;代表年&#xff0c;可为一到四位数。Month&#…...

税务网站建设的建议/seo推广是什么意怿

1.队列组两种使用方法2.队列组等待 wait /** 新方法队列组一般用在在异步操作&#xff0c;在主线程写队列组毫无任何作用*/ - (void)GCD_Group_new_group___notify{dispatch_queue_t queue dispatch_queue_create("11", DISPATCH_QUEUE_CONCURRENT);dispatch_queue_…...

网页网站导读怎么做/seo文章代写平台

有些资料讲如果要支持目录必须使用iiswriter,或者其他软件&#xff0c;其实通过简单对iis配置&#xff0c;再利用urlwriter就可以完美解决url重写的问题可以将http://abc.domain.com/blog转向到http://www.domain.com/xxx.aspx?usernameabc当然首先要将主机的泛域名支持打开。…...