Spring Boot+RabbitMQ+Canal 解决数据一致性
目录大纲
- 一、环境配置
- 1.1 docker-compose.yml 配置
- 1.2 docker-compose 常用命令
- 1.3 镜像服务启动状态
- 二、MySQL binlog 配置
- 2.1 docker-compose command 配置 binlog
- 2.2 创建canal用户,以及查看是否开启binlog
- 三、canal 相关配置文件
- 3.1 canal.properties 完整文件
- 3.2 instance.properties 完整文件
- 3.3 检查配置是否与宿主机一致
- 3.4 开启相关端口防火墙配置
- 四、代码实现
- 4.1 相关pom依赖引入
- 4.2 完整pom.xml
- 4.3 application.yml 配置
- 4.4 完整application.yml配置
- 4.5 RabbitConstants 基础常量配置
- 4.6 CanalMqConfigure MQ队列交换机配置
- 4.7 CanalConsumer 消费者
- 五、运行与测试
一、环境配置
1.1 docker-compose.yml 配置
version: '3.8'services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- "6379:6379"volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ "redis-server","/usr/local/redis/config/redis.conf" ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- "3306:3306"volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-plugin=mysql_native_password--character-set-server=utf8mb4--collation-server=utf8mb4_general_ci--explicit_defaults_for_timestamp=true--lower_case_table_names=1--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- "5672:5672"- "15672:15672"volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USER=guest- RABBITMQ_DEFAULT_PASS=guestnetworks:- app_netnetworks:app_net:driver: bridge
1.2 docker-compose 常用命令
# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器,并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器(或指定容器)
docker-compose start [service]# 暂停项目中容器(或指定容器)
docker-compose stop [service]
1.3 镜像服务启动状态
[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d4260bc557b canal/canal-server:v1.1.5 "/alidata/bin/main.s…" 38 minutes ago Up 38 minutes 9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp canal
c66b3f1f13a9 mysql:8.0.30 "docker-entrypoint.s…" 38 minutes ago Up 38 minutes 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql
645e27bd4001 rabbitmq:3-management "docker-entrypoint.s…" 5 hours ago Up 49 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
f55d42cbbd8e redis:6.2.7 "docker-entrypoint.s…" 3 days ago Up 49 minutes 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis
二、MySQL binlog 配置
2.1 docker-compose command 配置 binlog
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
2.2 创建canal用户,以及查看是否开启binlog
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.05 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| % | canal | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | | | | 0 | 0 | 0 | 0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N | 2025-03-10 11:53:49 | NULL | N | N | N | NULL | NULL | NULL | NULL |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.30 sec)
三、canal 相关配置文件
canal.properties
主要核心配置
canal.serverMode=rabbitMQ
:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq
:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queue
、rabbitmq.routingKey
、rabbitmq.exchange
: RabbitMQ 的三件套,用于后续创建具体通道监听。
#################################################
######### common argument #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
instance.properties
主要核心配置
canal.instance.master.address
数据库地址
canal.instance.dbUsername
数据库用户名
canal.instance.dbPassword
数据库密码
canal.mq.topic
RabbitMQ路由
# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root# mq config
canal.mq.topic=canal-routing-key
3.1 canal.properties 完整文件
#################################################
######### common argument #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
######### destinations #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=
3.2 instance.properties 完整文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# multi stream for polardbx
canal.instance.multi.stream.on=false# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3.3 检查配置是否与宿主机一致
进入容器内部:docker exec -it canal bash
检查配置文件内容是否与宿主机一致:
cat /home/admin/canal-server/conf/canal.properties
cat /home/admin/canal-server/conf/example/instance.properties
3.4 开启相关端口防火墙配置
canal
:11110、11111、11112mysql
:3306redis
:6379RabbitMQ
: 15672、5672
四、代码实现
4.1 相关pom依赖引入
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository -->
</parent><!-- Spring Boot MQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- canal -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
4.2 完整pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.neo</groupId><artifactId>code-repository</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository --></parent><name>code-repository</name><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hutool.version>5.8.20</hutool.version><mysql.version>8.0.30</mysql.version><mybatis-plus.version>3.5.3.1</mybatis-plus.version><redis.version>3.1.0</redis.version><druid.version>1.2.16</druid.version><fastjson.version>1.2.83</fastjson.version><sa-token.version>1.37.0</sa-token.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-redis-jackson</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
4.3 application.yml 配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true
4.4 完整application.yml配置
server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4.5 RabbitConstants 基础常量配置
public interface RabbitConstants {interface Canal {String QUEUE = "canal-queue";String EXCHANGE = "canal-exchange";String ROUTING = "canal-routing-key";}interface EventType {String INSERT = "INSERT";String UPDATE = "UPDATE";String DELETE = "DELETE";}}
4.6 CanalMqConfigure MQ队列交换机配置
@Configuration
public class CanalMqConfigure {@Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}@Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}
4.7 CanalConsumer 消费者
package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {@RabbitHandlerpublic void execute(Map<String, Object> msg) {log.info("canal消息监听事件触发,消息内容:{}", msg);boolean isDdl = (boolean) msg.get("isDdl");if (isDdl) {return;}String database = (String) msg.get("database");String table = (String) msg.get("table");String type = (String) msg.get("type");List<?> data = (List<?>) msg.get("data");log.info("database:{}.table:{}", database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println("INSERT");} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println("UPDATE");} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println("DELETE");} else {// 其他事件}}
}
五、运行与测试
当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
UPDATE
以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。
相关文章:

Spring Boot+RabbitMQ+Canal 解决数据一致性
目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户,以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文…...

Java高频面试之集合-08
hello啊,各位观众姥爷们!!!本baby今天来报道了!哈哈哈哈哈嗝🐶 面试官:详细说说CopyOnWriteArrayList CopyOnWriteArrayList 详解 CopyOnWriteArrayList 是 Java 并发包(java.util…...

C#实现高性能异步文件下载器(支持进度显示/断点续传)
一、应用场景分析 异步文件下载器用处很大,当我们需要实现以下功能时可以用的上: 大文件下载(如4K视频/安装包) 避免UI线程阻塞,保证界面流畅响应多任务并行下载 支持同时下载多个文件,提升带宽利用率后台…...

【数据分析】转录组基因表达的KEGG通路富集分析教程
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍差异分析(limma)KEGG富集分析(enrichKEGG)可视化加载R包数据下载导入数据基因差异分析火山图KEGG通路富集分析可视化通路结果另一个案例总结系统信息参考介绍 KEGG富集分析,可…...

【由技及道】API契约的量子纠缠术:响应封装的十一维通信协议(全局的返回结果封装)【人工智障AI2077的开发日志012】
摘要:在API通信的量子混沌中,30种返回格式如同平行宇宙的物理定律相互碰撞。本文构建的十一维通信协议,通过时空锚点(ApiResult)、量子过滤器(ResponseWrapper)和湮灭防护罩(Jackson…...

STM32 ——系统架构
3个被动单元 SRAM 存储程序运行时用到的变量 Flash(内部闪存存储器) 存储下载的程序 程序执行时用到的常量 桥接1和桥接2 AHB到APB的桥(AHBtoAPBx) 桥1 通过APB2总线连接到APB2上的外设。 高速外设,最高72MHz。 桥2 通过…...

算法 之 树形dp 树的中心、重心
文章目录 重心实践题目小红的陡峭值 在树的算法中,求解树的中心和重心是一类十分重要的算法 求解树的重心 树的重心的定义:重心是树中的一个节点,如果将这个点删除后,剩余各个连通块中点数的最大值最小,那么这个节点…...

如何利用 Excel 表格实现精准文件批量重命名教程
在处理大量文件时,有时需要根据特定规则对文件名进行调整。如果您的文件名和新名称之间存在一对多的关系,并且这种关系可以通过 Excel 表格来管理,那么使用“简鹿文件批量重命名”软件中的“匹配对应名称命名”功能将是一个高效的选择。接下来…...

ACE协议学习1
在多核系统或复杂SoC(System on Chip)中,不同处理器核心或IP(Intellectual Property)模块之间需要保持数据的一致性。常用的是ACE协议or CHI。 先对ACE协议进行学习 ACE协议(Advanced Microcontroller Bu…...

【实战ES】实战 Elasticsearch:快速上手与深度实践-5.1.1热点分片识别与均衡策略
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 5.1.1 Filebeat Logstash ES Kibana 全链路配置实1. 架构设计与组件选型1.1 技术栈对比分析1.2 硬件配置推荐 2. Filebeat 高级配置2.1 多输入源配置2.2 性能优化参数 3.…...

Kubernetes 的正式安装
1.基础的网络结构说明 软件路由器 ikuai 当然同一个仅主机模式 相当于在 同一个我们所谓的广播域内 所以相当于它们的几张网卡 是被连接起来的 为了防止出现问题 我们可以把第二块网卡临时关闭一下 2.准备路由器 ikuai 爱快 iKuai-商业场景网络解决方案提供商 (ikuai8.com)…...

初阶数据结构(C语言实现)——4.2队列
目录 2.队列2.1队列的概念及结构2.2队列的实现2.2.1 初始化队列2.2.2 销毁队列2.2.3 队尾入队列2.2.4 队头出队列2.2.5获取队列头部元素2.2.6 获取队列队尾元素2.2.7获取队列中有效元素个数2.2.8 检测队列是否为空,如果为空返回非零结果,如果非空返回0 3…...

Mysql主从复制和Mysql高可用以及负载均衡配置
需要先配置MySQL主从复制,然后再在主MySQL服务器上配置MySQL Router。以下是详细说明和步骤: 1. 为什么需要先配置MySQL主从复制? MySQL主从复制是MySQL高可用性和负载均衡的基础,通过将数据从主服务器实时同步到从服务器&#…...

c#中使用时间戳转换器
在C#中,时间戳转换器通常用于将时间戳(通常是一个表示自某一特定时间点(如1970年1月1日UTC)以来的毫秒数的长整型值)转换为DateTime对象,或者将DateTime对象转换回时间戳。以下是几种实现这一功能的方法: 1. 使用DateTime的构造函数 将时间戳转换为DateTime long tim…...

杂项知识笔记搜集
1.pygame pygame可以画出来图形界面,pygame Python仓库 PyGame游戏编程_游戏程序设计csdn-CSDN博客 2.V4L2库 V4L2是Linux上的Camera采集器的框架 Video for Linux ,是从Linux2.1版本开始支持的。HDMI视频采集卡采集到的视频通过USB3.0输出࿰…...

rust语言match模式匹配涉及转移所有权Error Case
struct S{data:String, }//注意:因为String默认是移动语义,从而决定结构体S也是移动语义,可采用(1)或(2)两种方法解决编译错误;关键思路:放弃获取结构体S的字段data的所有权,改为借用。fn process(s_ref:&a…...

golang从入门到做牛马:第十一篇-Go语言变量作用域:变量的“生活圈”
在Go语言中,变量的作用域决定了它在程序中的可见性和生命周期。理解变量的作用域对于编写清晰、高效的代码至关重要。Go语言中的变量可以在三个主要地方声明:函数内、函数外和函数定义中。接下来,让我们深入探讨局部变量、全局变量和形式参数的作用域。 局部变量:函数内的“…...

【Linux】37.网络版本计算器
文章目录 1. Log.hpp-日志记录器2. Daemon.hpp-守护进程工具3. Protocol.hpp-通信协议解析器4. ServerCal.hpp-计算器服务处理器5. Socket.hpp-Socket通信封装类6. TcpServer.hpp-TCP服务器框架7. ClientCal.cc-计算器客户端8. ServerCal.cc-计算器服务器9. 代码时序1. 服务器启…...

linux安装Mariadb10.5并修改端口
首先配置yum源 进入下方的文件进行配置 vim /etc/yum.repos.d/MariaDB.repo填写下方内容 [mariadb]name MariaDBbaseurl https:///mirrors.aliyun.com/mariadb/yum/10.5/centos8-amd64/gpgkeyhttps:///mirrors.aliyun.com/mariadb/yum/RPM-GPG-KEY-MariaDBmodule_hotfixes…...

从Windows到ARM Linux:Qt程序的交叉编译与移植指南
引言 在嵌入式开发中,我们经常需要将桌面端开发的Qt程序部署到ARM架构的Linux设备。本文详细介绍如何将Windows平台开发的Qt程序,通过Linux虚拟机交叉编译为ARM架构可执行文件的完整过程 环境准备 需要特别注意的是,对于CentOS 7 默认支持…...

【微信小程序】uniapp开发微信小程序
uniapp开发微信小程序 1、上拉加载 下拉刷新 import { onReachBottom, onPullDownRefresh } from dcloudio/uni-app;配置允许下拉刷新: {"path" : "pages/pet/pet","style" : {"navigationBarTitleText" : ""…...

多视图几何--结构恢复--三角测量
三角测量 1. 核心公式推导 假设两个相机的投影矩阵为 P P P 和 P ′ P P′,对应的匹配图像点(同名点)为 ( u , v ) (u, v) (u,v) 和 ( u ′ , v ′ ) (u, v) (u′,v′),目标是求解三维点 X [ X x , X y , X z , 1 ] T X [X_x, X_y, X_z, 1]^T X…...

【Linux三剑客】awk命令使用
AWK 编程语言中的变量 AWK 提供了许多可在模式和操作中使用的内置变量。最常用的变量是 - NR - 表示当前记录(行)号 NF - 表示输入记录中的字段总数。 $0 - 整个当前记录。 $1, $2, $3, … - 当前记录中的第一个、第二个、第三个…字段。 查找passwd中…...

Python CATIA二次开发实战:CATIA产品号批量同步文件名工具开发
引言 在汽车/航空制造领域,CATIA文件的结构化管理直接影响着PLM系统数据一致性。笔者近期开发的增强型产品号同步工具,成功解决了工程实践中文件名与产品名称不同步的痛点问题。本文将从技术实现、功能亮点、应用场景三个维度进行深度解析。 一、技术方…...

我的两个医学数据分析技术思路
我的两个医学数据分析技术思路 从临床上获得的或者公共数据库数据这种属于观察性研究,是对临床诊疗过程中自然产生的数据进行分析而获得疾病发生发展的规律等研究成果。再细分,可以分为独立危险因素鉴定和预测模型构建两种。 独立危险因素鉴定是一直以…...

操作系统之进程状态、优先级和切换与调度
文章目录 1. 进程状态1.1 课本名词提炼1.2 运行&阻塞&挂起1.2.1 运行1.2.2 阻塞1.2.3 挂起 1.3 理解内核链表1.4 Linux中的内核解释1.5 进程状态的查看1.6 Z(zombie)——僵尸进程1.6.1 创建僵尸进程1.6.2 僵尸进程的危害 1.7 孤儿进程 2. 进程优先级2.1 基本概念2.2 查…...

[免费]微信小程序(图书馆)自习室座位预约管理系统(SpringBoot后端+Vue管理端)(高级版)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序(图书馆)自习室座位预约管理系统(SpringBoot后端Vue管理端)(高级版),分享下哈。 项目视频演示 【免费】微信小程序(图书馆)自习室座位预约管理系统(SpringBoot后端Vue管理端)(高级版…...

你使用过哪些 Java 并发工具类?
你的回答(口语化,面试场景) 面试官:你使用过哪些 Java 并发工具类? 你: 好的,我结合项目经验来说说常用的并发工具类: CountDownLatch 作用:等所有线程就绪后再触发任务…...

模板方法模式的C++实现示例
核心思想 模板方法设计模式是一种行为设计模式,它定义了一个算法的框架,并将某些步骤的具体实现延迟到子类中。通过这种方式,模板方法模式允许子类在不改变算法结构的情况下重新定义算法的某些步骤。 模板方法模式的核心在于: …...

国产编辑器EverEdit - 脚本(解锁文本编辑的无限可能)
1 脚本 1.1 应用场景 脚本是一种功能扩展代码,用于提供一些编辑器通用功能提供不了的功能,帮助用户在特定工作场景下提高工作效率,几乎所有主流的编辑器、IDE都支持脚本。 EverEdit的脚本支持js(语法与javascript类似)、VBScript两种编程…...