【微服务】mysql + elasticsearch数据双写设计与实现
目录
一、前言
二、为什么使用mysql+es双写
2.1 单用mysql的问题
2.2 为什么不直接使用es
2.2.1 非关系型表达
2.2.2 不支持事务
2.2.3 多字段将造成性能低下
三、mysql+es双写方案设计要点
3.1 全新设计 VS 中途调整架构
3.2 全表映射 VS 关键字段存储
3.2.1 最大程度发挥es性能
3.2.2 选择mysql还是es作为数据托底
3.3 数据一致性保障
3.3.1 同步双写
3.3.2 异步双写
3.3.3 定期同步
3.3.4 数据订阅
四、mysql+es双写方案数据迁移
4.1 数据迁移整体方案
4.1.1 创建索引
4.1.2 双写改造
4.1.3 数据迁移
4.1.4 搜索服务上线
4.2 数据迁移补充说明
五、方案实施
5.1 前置准备
5.1.1 搭建环境
5.1.2 创建数据表
5.1.3 插入初始化数据
5.1.4 创建一个索引
5.2 搭建springboot工程
5.2.1 引入基础依赖
5.2.2 核心配置文件
5.2.3 es客户端连接配置
5.2.3 mybatis文件
5.2.4 业务实现类
5.2.4 相关测试
5.3 双写业务实现
5.4 数据搜索
5.5 数据迁移
六、写在文末
一、前言
在很多电商网站中,对商品的搜索要求很高,主要体现在页面快速响应搜索结果。这就对服务端接口响应速度提出了很高的要求。而商品数据存储离不开mysql,在高并发场景下,尤其是数据规模达到一定量级,mysql的性能瓶颈一定会出现,为了满足极致的搜索速度,往往需要借助第三方存储,比如nosql数据库,当然主流的搭配还是使用搜索引擎来完成,于是在很多场景下,会选择mysql+elasticsearch来满足这个场景下对搜索的要求。
如下是一个典型的使用mysql+es实现数据双写的应用场景。
二、为什么使用mysql+es双写
2.1 单用mysql的问题
在很多互联网项目中,mysql数据库仍然是主流,毕竟关系型数据库可以处理现实场景中很多复杂的业务模型,但是mysql随着数据规模的增长,一旦单表数据量达到了千万级,性能将下降的很快,于是不得不进行数据库的扩展,这样也带来了架构上的复杂性,综合来说,在类似某宝,某东等这样的电商场景下,单表存储数据带来的问题主要如下:
-
单表数据承载有限,当数据规模超过千万就要考虑分库或分表,从而给数据库架构设计提出新的挑战;
-
mysql不适合全文检索,经管mysql从某个版本支持了全文检索,但是在实际使用中性能很弱;
-
mysql的模糊匹配无法满足多场景下的复杂的搜索要求,比如电商场景下,多维度任意组合搜索是很常用得,而复杂的搜索将会使得mysql性能急剧下降;
2.2 为什么不直接使用es
到这里也会有人提出疑问,既然es搜索速度如此高效,并且也可以存储数据,直接使用es存储mysql表中的数据不就行了。对于这个问题,主要从下面几点进行考虑,
2.2.1 非关系型表达
使用mysql进行数据库设计的一个好处就是,不同的表之间可以通过某个字段进行关联,关联关系的存在,让现实中复杂的业务模型通过表关联进行实现,而es则不支持不同索引之间的关联搜索。
2.2.2 不支持事务
mysql事务的存在,让数的写入完整性得到保障,而es是不支持事务的,这就导致在往es写数据时,数据的一致性需要通过其他的手段来保障。
2.2.3 多字段将造成性能低下
上面谈到,由于es不支持关联查询,实际业务中,一个页面展现的数据往往来自多张表的关联聚合查询结果,es为了达到与mysql同样的效果,只好尽可能在一个索引中冗余更多的字段,从es存储的角度来说,es是基于字段的,大行超多字段将会大大降低性能,同时也会导致后续数据的维护困难和复杂性。
三、mysql+es双写方案设计要点
在很多开发者看来,使用mysql+es双写的方案,就是把某个高频搜索的表的数据存储一份到es就可以了,这么理解倒也不错,不过还缺少很多深入的考虑。在正式开始设计方案之前,需要重点考虑下面几点,这也将是本文后续探讨的重点,以及在实际开发中需要关注的。
3.1 全新设计 VS 中途调整架构
这是一个很现实的摆在很多架构设计者面前的问题,为什么这么说呢,在很多企业的项目中,经历了从单体架构到微服务的改造,从简单的http调用,webservice调用到使用dubbo等服务治理的技术改造。
如果现在的你正在经历一个全新的项目,那么恭喜你,你可以拥有更多的技术选型空间,但是如果你正则经历项目的服务化改造,这个过程可能比较痛苦,不仅要考虑引入新技术的成本,更要考虑新技术的实现会给未来技术的演进带来何种影响,包括团队学习、维护成本,上线后的运维成本,与其他技术的融合成本等。
回到上面的问题,在使用mysql+es双写方案来说,同样会面临相同的难题,如果是全新的设计,主要考虑的是如何实现mysql与es双写数据的一致性,及如何基于团队成员现有的技术、业务上对双写数据实时性等方面,评估出以最低成本的实现方案即可。
而如果是中途更换设计方案,比如线上的数据规模已经达到千万量级,顶不住客户的压力到了不得不调整架构的阶段来考虑这个问题,这个过程将会拉得很长。此时你考虑的点会更多了,包括:
-
如何设计es索引?
-
如何基于现有的代码实现数据双写并且尽可能降低对现有逻辑的侵入性?
-
如何保障双写数据的一致性?
-
针对历史数据如何迁移?
-
如何减少生产上线后的实施成本和运维成本?
-
...
3.2 全表映射 VS 关键字段存储
使用过mysql的同学应该不陌生,mysql是行式存储数据,而es中,数据则以准json的结构存储,两者之间经管能够通过字段进行对应,但在检索的时候原理是不一样的,如下图所示。
在实际使用mysql+es进行双写方案设计时,很多人直接就认为,将mysql的表字段进行一份全量的拷贝到es的索引中即可,这样从实现上固然没有差别,最终也能达到效果,但这样做真的合理吗?在进行方案设计的时候,从实际经验来说,功能的实现固然重要,但如何做到既能满足功能,又能让设计显得合理才是更需要深入思考的。就这个问题来说,如何才算合理呢?可以从下面几点展开思考。
3.2.1 最大程度发挥es性能
不管是mysql,还是es,不管是hbase还是clickhouse...所有的数据存储介质,都有自己的优势和不足,因此在选择某种存储引擎时一定是利用其优势,同时规避其不足。就es来说,选择它的原因就是因为在海量的数据且复杂的检索场景下,仍然能够保持高性能。
在上文也谈到单纯使用es带来的不足,其中值得注意的一点就是,es是基于字段存储的,对一行数据来说,字段数量越多,当一个待检索的请求发来时,其计算耗费的成本必然越高,这不仅是针对es,甚至mysql等很多关系型数据库,对于单表过多字段的冗余设计也不推荐,所以对es来说,也不建议存储mysql表的所有字段,而是关键的具有重要业务意义的字段数据。
3.2.2 选择mysql还是es作为数据托底
这是一个架构设计中容易被忽略的问题。文章开始谈到,一个基本的业务场景是,主业务数据写入到mysql,同时将数据同步写入es,检索从es获取数据。那么问题来了,实际业务中,究竟以哪个数据为准呢?我们以下面一个简单的同步写入场景的业务逻辑为例来说明相信就能理解了。
@Transactional
public boolean save(){//数据组装try{//写入mysql//写入es}catch(Exception e){//es数据回滚}
}
这是一段同步双写的伪代码,从这段代码不难看出,mysql的写入由事务机制保障,但是es的数据写入与回滚就比较麻烦了,而且这样的实现对业务逻辑的侵入性强,维护性差,但可以发现,我们首要保障的是mysql数据的完整性,因为只有数据成功写入,界面上展示的数据才是正确的。
从这个分析结合实际的业务实现,以一个电商或类似的场景,从产品列表到具体的详情页面为例进行说明,参考下面的流程;
-
用户浏览列表页;
-
用户从列表页通过关键字搜索目标数据;
-
从搜到的结果中选择某个具体的产品;
-
进入具体产品的页,展示与当前产品完整的数据;
从上面的业务流程分析不难看出,实际要展示某个产品字段数据是非常多的,以某大型电商网站上面展现的某个产品为例,展现在用户面前的商品包括了非常多的数据,这些数据是多个源表经过服务端聚合以后再经过复杂的处理得到的,所以如果将这么多的字段放在es的某个索引中,这明显是不合适的,总结来说,两者搭配使用时可以遵循下面的思路:
-
es存放核心业务表的核心字段,比如产品ID,产品的详情描述,SKU等信息;
-
列表搜索走es索引,通过es的检索,返回业务主键等关键信息;
-
将第二步es得到的数据给到mysql的业务表,返回最终的数据给到页面;
从上面的分析来看,在实际业务中,应该酌情考虑是否应该将核心业务表的全量数据存于es,一般建议业务表的核心字段,比如业务主键 + 高频搜索的字段存放es中;
3.3 数据一致性保障
使用双写方案在实际操作中,基于双写方案,如何保障mysql与es的数据一致性是设计与开发过程中需要重点关注的。
我们知道,mysql有事务机制保障数据的一致性,而es没有事务,在上文的伪代码中仅仅是使用了一种非常简单的逻辑来保障,这样是远远不够的。一旦发生了mysql与es数据的不一致,带来的问题是很严重的。关于如何保障数据的一致性,结合实际操作经验,给出下面的几点建议:
3.3.1 同步双写
同步双写是保障数据一致性最简单的方式,也是实际操作中比较简单的操作方式,只需要将数据写到 MySQL 时,同时将数据写到 ES即可,通过mysql自身的事务机制间接保障两者数据一致性,其优缺点如下。
优点:
-
这种方式简单粗暴,实时写入能做到秒级。
缺点:
-
业务耦合,代码侵入性强,即在代码中需要写入mysql表的位置都需要加写入es的代码;
-
性能影响,同步写入两个存储,响应时间变长;
-
可能存在丢数据的风险;
3.3.2 异步双写
异步双写,即在数据写入mysql的同时,异步写入到es中,具体在实践过程中也有多种方式可以选择,下面提供几种方案。
异步线程
利用异步线程的方式,写入mysql的时候,开启多线程写入es;
内存队列
可以利用Java中提供的内存队列,写入mysql的同时向内存队列,比如BlockingQueue,另有一个线程消费内存队列中的数据写入es;
事件监听
主业务流程写入mysql的同时发布事件,另有一个事件订阅者订阅mysql写入事件,从而做到与主业务逻辑的解耦。
引入消息中间件
也可以考虑引入消息中间件,做到与主业务逻辑的彻底解耦,写入mysql的同时,向消息队列发送消息,另有服务消费者订阅消息消费,异步写入es ;
上述各种方式均可以在实践中使用,需要结合团队的技术储备,以及服务器资源,后续的运维成本等综合考虑。
3.3.3 定期同步
定期同步适合对搜索场景不那么敏感的业务,在这种场景下,可以考虑每隔一段时间,或每天的某些时间点进行同步,将数据批量从mysql写入到es中。定期同步的优缺点如下。
优点:
-
实现简单,系统资源占用少;
缺点:
-
实时性难以保证;
-
瞬时存储压力较大;
3.3.4 数据订阅
既要提高实时性,又要低入侵, 可以考虑利用 MySQL 的 Binlog 来进行同步。在很多数据同步工具中,都采用了类似的思想,简单来说,订阅mysql的binglog日志,然后通过回放binlog日志变化解析出变化的数据,从而进行数据同步。比如大家熟悉的canal就是很好的利用了这一点。
这种方式可以很好的与核心业务解耦,从而实现异步,总结来说,优点如下:
-
降低对主业务逻辑的代码侵入性;
-
数据的实时性好;
缺点:
-
对第三方组件存在一定的依赖性;
-
同步很难做到灵活性,很难对同步的数据做进一步的处理,比如同步时那些明显有问题的数据;
四、mysql+es双写方案数据迁移
对于一个全新的系统,结合上面考虑的要点,设计出一个相对完善的方案并落地实施不算难事,但是据个人经验,比较难的是中途引入es来补充和完善mysql的搜索能力上的短板。为什么这么说呢?
试想你的生产系统已经运行了很久了,mysql核心业务表也产生了相当量级的数据了。引入es之后,即便是双写,es中的存储的数据也是从某个时间点开始,搜索出来的数据也只有那个时间点之后的。那么之前的数据怎么办呢?肯定不能扔掉的。这时候就需考虑如何将之前mysql中老数据无损的迁入到es索引中。
这时候可能有人说这也不是什么难事吧,找个业务不繁忙的时间段将mysql中的老数据一次性迁移到es不就解决问题了吗?如果真是这么简单,就不会有那么多的麻烦事了,下面结合实践经验,从迁移的方案和迁移注意事项两方面进行说明。
4.1 数据迁移整体方案
以一个对数据搜索场景不是那么敏感的场景为例进行说明。整体业务流程如下:
结合上面的流程,完整的数据迁移思路如下:
-
创建索引;
-
双写方案V1版生产上线(不包括es搜索),业务数据实现mysql+es双写,考虑使用消息中间件,记录时间点为T1;
-
在完成数据迁移之前,搜索业务逻辑仍然走mysql,此时es索引中存储的是T1时间点开始之后的mysql数据;
-
业务低峰期,利用数据同步工具或FlinkCDC等方案第一次完成全量迁移,针对T1之前的;
-
双写方案V2版生产上线,数据搜索走es;
4.1.1 创建索引
建议自定义创建索引,控制索引中的字段信息,结合上面谈到的要点,es索引存储的字段信息为mysql核心业务表中的核心业务字段,比如业务主键,用于搜索的高频字段信息。
4.1.2 双写改造
稳妥起见,在第一个改造发布的版本中,代码逻辑层面先支持双写,比如通过异步线程将数据写入es,此时es索引中就存储了某个时间点T1之后的数据。
4.1.3 数据迁移
使用数据迁移工具或自己开发一个微服务,在业务低峰期(凌晨2点)完成一次全量数据的迁移,迁移完成后,ES中的数据基本与mysql表数据同步了。
4.1.4 搜索服务上线
上线搜索服务,此时数据的搜索将走es,具体的实现逻辑结合自身的业务场景酌情改造。比如上文谈到的,如果产品的详情页面是多个表的聚合结果,首先需要通过搜索得到核心的业务字段信息,然后代入到后面的逻辑中进行数据的组装。
4.2 数据迁移补充说明
以上结合实际场景给出了一个相对通用的数据迁移方案,在实际操作中,遇到的情况可能比这个更复杂,比如你可能遇到下面的这些情况:
-
你要迁移的数据表经过了分库分表,即业务表的数据存储在多个库或多张表中,这种情况下如何迁移?;
-
你要迁移的数据表数据量非常大,而且可以预计每月的增长量为几百万,如何保障保证es的存储容量?如何规划es的后续扩容?;
-
迁移的数据量巨大,需要很久怎么办?
-
迁移数据量巨大,迁移过程中发生异常怎么办?
-
...
五、方案实施
下面通过实际代码演示一下完整的业务流程。
5.1 前置准备
5.1.1 搭建环境
这里假设你已经提前搭建好es、mysql的环境。
es的搭建可以参考文章:es脚本编程使用,mysql可以使用下面的docker命令快速开启mysql服务
docker run -p 3307:3306 --name mysql57 \
-v /usr/local/docker/mysql/data:/var/lib/mysql \
-v /usr/local/docker/mysql/conf:/etc/mysql/conf.d \
-v /usr/local/docker/mysql/log:/var/log/mysql \
-e MYSQL_ROOT_PASSWORD=你的root密码\
-d mysql:5.7
5.1.2 创建数据表
使用下面的sql语句创建一张数据表,其中desc字段会被作为高频字段搜索使用
CREATE TABLE `product` (`id` int(12) NOT NULL,`pro_name` varchar(64) DEFAULT NULL,`pro_no` varchar(32) DEFAULT NULL,`price` int(10) DEFAULT NULL,`category` varchar(32) DEFAULT NULL,`stock` int(32) DEFAULT NULL,`desc` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.1.3 插入初始化数据
为上述的表插入一些数据
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (1, '小米14', 'A100', 3999, 'phone', 32, 'xiao mi phone');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (2, 'Java入门到精通', 'B100', 56, 'book', 12, 'Java technology');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (3, '精品男鞋', 'X100', 325, 'shoe', 82, 'Man shoe');
5.1.4 创建一个索引
创建一个名为product的索引,并指定desc字段分词,里面的字段与mysql表对应,但不是所有字段;
PUT product
{"mappings": {"properties": {"id":{"type": "long"},"pro_name": {"type": "keyword"},"desc": {"type": "text"}}}
}
测试创建一条数据
PUT /product/_doc/11
{"pro_name":"汪汪队纪念品","desc":"for children play"
}
查询这条数据
GET /product/_doc/11
到这里,我们的准备工作就完成了,接下来将在代码中完成剩下的操作。
5.2 搭建springboot工程
本工程要做的事情如下:
- 整合mybatis,与es;
- 利用mybatis实现增删改查功能;
- 利用异步线程写入es;
- 实现mysql历史数据的迁移;
5.2.1 引入基础依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.4</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.6.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.6.2</version></dependency><!--<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.6.2</version></dependency>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${boot-web.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lomok.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>
5.2.2 核心配置文件
主要配置mysql,mybatis以及es相关的连接信息
server:port: 8082spring:datasource:username: rootpassword: rooturl: jdbc:mysql://IP:3307/pt_res?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=falsedriver-class-name : com.mysql.jdbc.Driverelasticsearch:rest:uris: [IP:9200]host: IPport: 9200mybatis:mapper-locations: classpath:mybatis/*.xmltype-aliases-package: com.congge.entity
5.2.3 es客户端连接配置
自定义一个类,自定义一个RestHighLevelClient 的bean,配置es连接信息
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class EsConfig {@Value("${spring.elasticsearch.host}")private String host;@Value("${spring.elasticsearch.port}")private int port;@Bean(name = "restHighLevelClient")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}}
5.2.3 mybatis文件
在resources目录下创建mybatis目录,在里面编写与mysql操作的文件,这里创建一个操作product表的xml文件
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.congge.dao.ProductDao"><resultMap id="BaseResultMap" type="com.congge.entity.Product"><id column="id" property="id" jdbcType="VARCHAR" /><result column="pro_name" property="proName" jdbcType="VARCHAR" /><result column="pro_no" property="proNo" jdbcType="VARCHAR" /><result column="price" property="price" jdbcType="INTEGER" /><result column="category" property="category" jdbcType="VARCHAR" /><result column="stock" property="stock" jdbcType="INTEGER" /><result column="desc" property="desc" jdbcType="VARCHAR" /></resultMap><select id="getAll" resultMap="BaseResultMap">select * from product</select>
</mapper>
注意启动类上面添加dao包的扫描
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@MapperScan("com.congge.dao")
public class SyncApp {public static void main(String[] args) {SpringApplication.run(SyncApp.class,args);}}
5.2.4 业务实现类
@Service
public class ProductServiceImpl implements ProductService {@Autowiredprivate ProductDao productDao;@Overridepublic List<Product> getAll() {return productDao.getAll();}
}
5.2.4 相关测试
框架整合完毕之后,及时通过单元测试验证是否整合成功,下面给出了一些关于mysql操作以及索引操作的单元测试用例
import com.alibaba.fastjson.JSONObject;
import com.congge.SyncApp;
import com.congge.entity.Product;
import com.congge.entity.es.ProductInfo;
import com.congge.service.ProductService;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.List;
import java.util.Map;//@RunWith(SpringRunner.class)
@SpringBootTest(classes = {SyncApp.class})
public class EsTest {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Autowiredprivate ProductService productService;@Testpublic void testFindAll(){List<Product> all = productService.getAll();System.out.println(all);}@org.junit.jupiter.api.Testvoid contextLoads() {System.out.println(restHighLevelClient);}/*** 判断索引是否存在*/@Testpublic void getIndex() throws Exception {IndicesClient indices = restHighLevelClient.indices();GetIndexRequest student0517 = new GetIndexRequest("product");boolean exists = indices.exists(student0517, RequestOptions.DEFAULT);if(exists){GetIndexResponse indexResponse = indices.get(student0517, RequestOptions.DEFAULT);Map<String, MappingMetaData> mappings = indexResponse.getMappings();System.out.println(mappings);}else{System.out.println("索引不存在");}}@Testpublic void getDocById() throws Exception {GetRequest getRequest = new GetRequest("product").id("11");GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);//集合方式Map<String, Object> source = documentFields.getSource();for (String key : source.keySet()) {System.out.println(source.get(key));}//字符串 -----JSONString sourceAsString = documentFields.getSourceAsString();System.out.println(sourceAsString);//把JSON转换为 stuent//JSON字符串-->JSON对象JSONObject jsonObject = JSONObject.parseObject(sourceAsString);System.out.println(jsonObject);}@Testpublic void getDocByIdV2() throws Exception {SearchRequest searchRequest = new SearchRequest();searchRequest.indices("product");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchRequest.source(searchSourceBuilder.query(QueryBuilders.termQuery("_id", 11)));searchSourceBuilder.size(1);SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);SearchHits searchHits = search.getHits();for (SearchHit searchHit : searchHits) {Map<String, Object> sourceMap = searchHit.getSourceAsMap();System.out.println(sourceMap);}}@Testpublic void insertDoc() throws Exception {com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(13);pro.setPro_name("MP3");pro.setDesc("music player");String proData = objectMapper.writeValueAsString(user);indexRequest.source(proData,XContentType.JSON);//插入数据IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);System.out.println(response.status());System.out.println(response.getResult());}}
5.3 双写业务实现
按照上文的业务实现流程,向mysql表插入一条数据,同时写入一条数据到es
@Override@Transactionalpublic Object save(Product product) {productDao.save(product);saveEs(product)//CompletableFuture.runAsync(() -> saveEs(product), newCachedThreadPool());return product.getId();}public void saveEs(Product product){com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(product.getId());pro.setPro_name(product.getProName());pro.setDesc(product.getDesc());String productData = null;try {productData = objectMapper.writeValueAsString(pro);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(productData,XContentType.JSON);//插入数据IndexResponse response = null;try {response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("save to es error,error : 【{}】",e.getMessage());e.printStackTrace();}System.out.println(response.status());System.out.println(response.getResult());}/*** 带有缓存功能线程池** @return*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
使用单元测试测试一下方法
@Testpublic void testSave(){Product product = new Product();product.setId(6);product.setProName("可比克薯片");product.setProNo("F003");product.setPrice(7);product.setCategory("food");product.setStock(33);product.setDesc("classics food");Object save = productService.save(product);System.out.println(save);}
跑通之后,检查mysql与es的数据是否正常写入
5.4 数据搜索
我们假设用户输入关键字进行搜索,首先通过es的检索,得到表的基本关键字段,比如id,然后去mysql中查询完整的信息,核心业务实现逻辑如下。
@Overridepublic List<Product> query(String key) {List<Integer> result = queryFromEs(key);List<Product> queryRes = null;if(!CollectionUtils.isEmpty(result)){queryRes = productDao.getProductIn(result);}return queryRes;}private List<Integer> queryFromEs(String key) {SearchRequest request = new SearchRequest();request.indices("product");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();FuzzyQueryBuilder fuzzyQueryBuilder =QueryBuilders.fuzzyQuery("desc", key).fuzziness(Fuzziness.ONE);sourceBuilder.query(fuzzyQueryBuilder);request.source(sourceBuilder);SearchResponse response = null;try {response = restHighLevelClient.search(request, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}System.out.println(response.getHits().getHits());System.out.println(response.getHits().getTotalHits());SearchHits hits = response.getHits();List<Integer> ids = new ArrayList<>();for (SearchHit searchHit : hits){Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();System.out.println(sourceAsMap);ids.add(Integer.valueOf(sourceAsMap.get("id").toString()));}return ids;}
编写单元测试用例
@Testpublic void query(){List<Product> res = productService.query("food");System.out.println(res);}
事实上,实际业务中,从es中查出了id等信息之后,需要通过id字段去mysql中进行多表关联的查询才能聚合结果,但是走es的搜索之后,可以大大提升获取id的性能
5.5 数据迁移
简单起见,这里直接使用定时任务做数据同步,可以考虑凌晨的时候来做这件事,核心迁移方法
public void doSync() {//设置一个时间点的条件作为同步数据的边界List<Product> syncDatas = productDao.getSyncDatas();for(Product product :syncDatas ){ObjectMapper objectMapper = new ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo productInfo = new ProductInfo();productInfo.setId(product.getId());productInfo.setPro_name(product.getProName());productInfo.setDesc(product.getDesc());String proData = null;try {proData = objectMapper.writeValueAsString(productInfo);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(proData,XContentType.JSON);//插入数据try {IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}log.info("同步完成");}
最后增加一个定时任务的类,将上述的方法添加进去
import com.congge.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;@Configuration
@EnableScheduling
public class SyncTask {@Autowiredprivate ProductService productService;@Scheduled(cron = "0/2 * * * * ?")private void configureTasks(){System.out.println("开始执行数据同步");productService.doSync();System.out.println("数据同步完成");}}
六、写在文末
本文通过较大的篇幅详细讨论了mysql与es实现双写的设计以及实现过程,当然在实际操作过程中还有很多值得探讨和细节,希望为看到的小伙伴提供一个思路,本篇到此结束,感谢观看。
相关文章:

【微服务】mysql + elasticsearch数据双写设计与实现
目录 一、前言 二、为什么使用mysqles双写 2.1 单用mysql的问题 2.2 为什么不直接使用es 2.2.1 非关系型表达 2.2.2 不支持事务 2.2.3 多字段将造成性能低下 三、mysqles双写方案设计要点 3.1 全新设计 VS 中途调整架构 3.2 全表映射 VS 关键字段存储 3.2.1 最大程度…...

《向量数据库指南》——用了解向量数据库Milvus Cloud搭建高效推荐系统
了解向量数据库 ANN 搜索是关系型数据库无法提供的功能。关系型数据库只能用于处理具有预定义结构、可直接比较值的表格型数据。因此,关系数据库索引也是基于这一点来比较数据。但是 Embedding 向量无法通过这种方式直接相互比较。因为我们不知道向量中的每个值代表什么意思,…...

EtherCAT主站SOEM -- 4 -- SOEM之ethercatprint.h/c文件解析
EtherCAT主站SOEM -- 4 -- SOEM之ethercatprint.h/c文件解析 一 ethercatprint.h/c文件功能预览:二 ethercatprint.h/c 文件的主要函数的作用:2.1.1 char* ec_sdoerror2string(uint32 sdoerrorcode)2.1.2 char* ec_ALstatuscode2string(uint16 ALstatusc…...

Redis01-缓存击穿、穿透和雪崩
目录 开场白-追命3连 使用场景 01缓存穿透场景与方案 02布隆过滤器 03缓存击穿场景与方案 04缓存雪崩场景与方案 开场白-追命3连 看你项目中有说用到Redis,都是哪些场景使用了Redis呢? 如果发生了缓存穿透、击穿、雪崩如何应对呢?缓存…...

multiple kernel learning(MKL)多核学习
历史上之所以会出现多核学习(MKL)这个词,是因为在深度学习流行起来以前,kernel是处理非线性的默认方法,那个年代优化一个非线性函数不容易,每加一层复杂性可能就需要多设计一个优化算法,MKL就是…...

JS匿名函数之函数表达式与立即执行函数
匿名函数是什么?和具名函数有什么区别?让我为大家介绍一下吧! 没有名字的函数,无法直接使用 一.函数表达式 将匿名函数赋值给一个变量,并且通过变量名去调用,我们将这个称为函数表达式 语法: …...

WebGL:基础练习 / 简单学习 / demo / canvas3D
一、前置内容 canvas:理解canvas / 基础使用 / 实用demo-CSDN博客 WebGL:开始学习 / 理解 WebGL / WebGL 需要掌握哪些知识 / 应用领域 / 前端值得学WebGL吗_webgl培训-CSDN博客 二、在线运行HTML 用来运行WebGL代码,粘贴--运行ÿ…...

Python基础入门例程44-NP44 判断列表是否为空(条件语句)
最近的博文: Python基础入门例程43-NP43 判断布尔值(条件语句)-CSDN博客 Python基础入门例程42-NP42 公式计算器(运算符)-CSDN博客 Python基础入门例程41-NP41 二进制位运算(运算符)-CSDN博客…...

【每日一题Day369】LC187重复的DNA序列 | 字符串哈希
重复的DNA序列【LC187】 DNA序列 由一系列核苷酸组成,缩写为 A, C, G 和 T.。 例如,"ACGAATTCCG" 是一个 DNA序列 。 在研究 DNA 时,识别 DNA 中的重复序列非常有用。 给定一个表示 DNA序列 的字符串 s ,返回所有在 DNA…...

服务器密码机主要功能及特点 安当加密
服务器密码机的主要功能包括: 数据加密:密码机使用各种加密算法对数据进行加密,确保只有拥有正确密钥的接收者才能解密和查看数据。数据解密:密码机使用相应的解密算法和密钥对已加密的数据进行解密,使其恢复成原始数据…...

RIP路由配置
RIP路由配置步骤与命令: 1.启用RIP路由:router rip 2.通告直连网络:network 直连网络 3.启用RIPv2版本:version 2 4.禁用自动汇总:no auto-summary 注意:静态路由通告远程网络,动态路由通告…...

尚硅谷Docker基础篇和Dockerfile超详细整合笔记
Docker基础篇DockerFile Docker:您要如何确保应用能够在这些环境中运行和通过质量检测?并且在部署过程中不出现令人头疼的版本、配置问题,也无需重新编写代码和进行故障修复?而这个就是使用容器。Docker解决了运行环境和配置问题…...

JavaScript_Date对象_实例方法_get类
计算这一年还剩多少天: <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>Document&…...

Go语言在区块链开发中的应用
引言 区块链是近年来备受关注的技术领域,它不仅改变了传统的数据交换和存储方式,还为各种应用场景提供了全新的解决方案。而Go语言(Golang)作为一门简洁、高效的编程语言,正逐渐成为开发区块链应用的首选语言。本文将…...

S4.2.4.5 Fast Training Sequence (FTS)
一 本章节主讲知识点 1.1 FTS的用途和实现注意 二 本章节原文翻译 Fast Training Sequence (FTS) 主要用于在L0s->L0跳转的过程中,让Receiver 检测到电气空闲退出,以及实现bit 和 symbol lock。 2.1 Gen1 and Gen2 速率 对于Gen1/2 FTS的组成如下…...

Gitlab CICD实用技巧汇总
关于.gitlab-ci.yml的实用配置 1、stage参数 stages: - build - test - deploy 相同stage的作业会并行执行,有一个失败,则认为这个stage失败。 不同stage的作业会按序执行,前面stage有失败,后续stage不会继续执行。 可以使用ne…...

JavaSpringbootMySQL高校实训管理平台01557-计算机毕业设计项目选题推荐(附源码)
目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3论文结构与章节安排 2 高校实训管理平台系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据增加流程 2.2.2 数据修改流程 2.2.3 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系…...

初阶JavaEE(14)表白墙程序
接上次博客:初阶JavaEE(13)(安装、配置:Smart Tomcat;访问出错怎么办?Servlet初识、调试、运行;HttpServlet:HttpServlet;HttpServletResponse)-C…...

算法设计与分析第二章作业
1. 描述最大字段和的分治算法 题目 思路 判断最大子段和,可以用分治的思想,每次将序列一分为二,选择两个序列的最大子段和。 但是这里还有一种可能,就是子段可以横跨两个子序列,所以我们的最大子段和就是࿱…...

《视觉SLAM十四讲》-- 三维空间的刚体运动
文章目录 02 三维空间的刚体运动2.0 机器人位姿表述2.1 点和坐标系2.1.1 三维坐标系有关表述2.1.2 坐标系变换 2.2 旋转向量和欧拉角2.2.1 旋转向量2.2.2 欧拉角 2.3 四元数2.3.1 四元数的定义2.3.2 四元数的计算2.3.3 四元数表示旋转2.3.4 四元数与其他旋转表示法的转换 2.4 相…...

关于iOS:如何使用SwiftUI调整图片大小?
How to resize Image with SwiftUI? 我在Assets.xcassets中拥有很大的形象。 如何使用SwiftUI调整图像大小以缩小图像? 我试图设置框架,但不起作用: 1 2 Image(room.thumbnailImage) .frame(width: 32.0, height: 32.0) 在Image上应用…...

【MySQL】数据库MySQL基础知识与操作
作者主页:paper jie_博客 本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。 本文录入于《MySQL》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造&a…...

vim手册(vim cheatsheet)
vim手册(vim cheatsheet) 1. 命令模式 1). 移动光标 在命令模式下,可以使用以下命令来移动光标: - h:向左移动一个字符。 - j:向下移动一行。 - k:向上移动一行。 - l:向右移动一个…...

软件测试具体人员分工
最近看了点敏捷测试的东西,看得比较模糊。一方面是因为没有见真实的环境与流程,也许它跟本就没有固定的模式与流程,它就像告诉人们要“勇敢”“努力”。有的人在勇敢的面对生活,有些人在勇敢的挑战自我,有些人在勇敢的…...

计算机网络-应用层
文章目录 应用层协议原理万维网和HTTP协议万维网概述统一资源定位符HTML文档 超文本传输协议(HTTP)HTTP报文格式请求报文响应报文cookie 万维网缓存与代理服务器 DNS系统域名空间域名服务器和资源记录域名解析过程递归查询迭代查询 动态主机配置协议&…...

linux 创建git项目并提交到gitee(保姆式教程)
01、git安装与初始化设置 mhzzjmhzzj-virtual-machine:~/work/skynetStudy$ apt install mhzzjmhzzj-virtual-machine:~/work/skynetStudy$ git config --global user.name "用户名" mhzzjmhzzj-virtual-machine:~/work/skynetStudy$ git config --global user.ema…...

STM32 IAP应用开发--bootloader升级程序
STM32 IAP应用开发--bootloader升级程序 Chapter1 STM32 IAP应用开发——通过串口/RS485实现固件升级(方式2)前言什么是IAP?什么是BootLoader? 方案介绍:1)bootloader部分:2)APP部分…...

Q_GLOBAL_STATIC宏
文章目录 目的Q_GLOBAL_STATIC源代码分析涉及到原子操作 以及静态变量初始化顺序代码实现 目的 由Q_GLOBAL_STATIC宏, 引发的基于线程安全的Qt 单例模式的使用。 Q_GLOBAL_STATIC /***************************************************************************…...

[批处理]_[初级]_[如何删除变量值里的双引号]
场景 在使用Visual Studio开发本地程序的时,需要在项目属性,生成事件->生成后事件里增加一些资源的打包,复制,删除等操作,那么就需要用到批处理来进行。而传递带空格的路径给外部的批处理文件时就需要双引号引用从…...

51单片机电子钟闹钟温度LCD1602液晶显示设计( proteus仿真+程序+原理图+设计报告+讲解视频)
51单片机电子钟闹钟温度液晶显示设计( proteus仿真程序原理图设计报告讲解视频) 1.主要功能:2.仿真3. 程序代码4. 原理图5. 设计报告6. 设计资料内容清单&&下载链接资料下载链接(可点击): 🌟51单片…...