当前位置: 首页 > news >正文

Canal自定义客户端

一、背景

在Canal推送数据变更信息至MQ(消息队列)时,我们遇到了特定问题,尤其是当消息体的大小超过了MQ所允许的最大限制。这种限制导致数据推送过程受阻,需要相应的调整或处理。

二、解决方法

采用Canal自定义客户端的方式,将接收到的消息进行压缩处理,并将消息转换为MQ消息格式。经过处理的消息将被推送到原有的MQ主题,与原有消息处理兼容,从而在提升数据处理效率的同时,确保既有的消费者逻辑得以保持不变,实现平滑过渡与升级。

三、例子

<!-- canal 客户端的集成 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version>
</dependency>
<!-- 客户端通信 消息传递 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version>
</dependency>
package cn.ewan.skyeye.log.audit.support.canal;import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;/*** @Description:* @Author: wulm* @Date: 2024/6/14 10:11* @Version:1.0*/
public class CustomCanalClient {private final CanalConnector connector;private final String FILTER_REGEX = "test_db.(test_table)";public CustomCanalClient(String destination, String host, int port) {connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port),destination, "", "");connector.connect();connector.subscribe(FILTER_REGEX);}public void listen() throws InterruptedException {while (true) {// 一次获取的变更事件数量int batchSize = 1000;Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();EntryRowData[] entryRowData = buildMessageData(message);if (entryRowData == null) {continue;}List<FlatMessage> flatMessages = messageConverter(entryRowData, batchId);for (FlatMessage flatMessage : flatMessages){System.out.println(compress(flatMessage));}System.out.println("============FlatMessage: " + JSONUtil.toJsonStr(flatMessages));// 确认消息已被处理connector.ack(batchId);// 暂停一秒,避免循环过快Thread.sleep(1000);}}public static class EntryRowData {public CanalEntry.Entry entry;public CanalEntry.RowChange rowChange;}public static EntryRowData[] buildMessageData(Message message) {if (message.isRaw()) {List<ByteString> rawEntries = message.getRawEntries();final EntryRowData[] datas = new EntryRowData[rawEntries.size()];int i = 0;for (ByteString byteString : rawEntries) {final int index = i;try {CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());datas[index] = new EntryRowData();datas[index].entry = entry;datas[index].rowChange = rowChange;} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}i++;}return datas;} else {final EntryRowData[] datas = new EntryRowData[message.getEntries().size()];int i = 0;for (CanalEntry.Entry entry : message.getEntries()) {final int index = i;try {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());datas[index] = new EntryRowData();datas[index].entry = entry;datas[index].rowChange = rowChange;} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}i++;}return datas;}}public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {List<FlatMessage> flatMessages = new ArrayList<>();for (EntryRowData entryRowData : datas) {CanalEntry.Entry entry = entryRowData.entry;CanalEntry.RowChange rowChange = entryRowData.rowChange;// 如果有分区路由,则忽略begin/end事件if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}// build flatMessageCanalEntry.EventType eventType = rowChange.getEventType();FlatMessage flatMessage = new FlatMessage(id);flatMessages.add(flatMessage);flatMessage.setDatabase(entry.getHeader().getSchemaName());flatMessage.setTable(entry.getHeader().getTableName());flatMessage.setIsDdl(rowChange.getIsDdl());flatMessage.setType(eventType.toString());flatMessage.setEs(entry.getHeader().getExecuteTime());flatMessage.setTs(System.currentTimeMillis());flatMessage.setSql(rowChange.getSql());
//            flatMessage.setGtid(entry.getHeader().getGtid());if (!rowChange.getIsDdl()) {Map<String, Integer> sqlType = new LinkedHashMap<>();Map<String, String> mysqlType = new LinkedHashMap<>();List<Map<String, String>> data = new ArrayList<>();List<Map<String, String>> old = new ArrayList<>();Set<String> updateSet = new HashSet<>();boolean hasInitPkNames = false;for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE&& eventType != CanalEntry.EventType.DELETE) {continue;}Map<String, String> row = new LinkedHashMap<>();List<CanalEntry.Column> columns;if (eventType == CanalEntry.EventType.DELETE) {columns = rowData.getBeforeColumnsList();} else {columns = rowData.getAfterColumnsList();}for (CanalEntry.Column column : columns) {if (!hasInitPkNames && column.getIsKey()) {flatMessage.addPkName(column.getName());}sqlType.put(column.getName(), column.getSqlType());mysqlType.put(column.getName(), column.getMysqlType());if (column.getIsNull()) {row.put(column.getName(), null);} else {row.put(column.getName(), column.getValue());}// 获取update为true的字段if (column.getUpdated()) {updateSet.add(column.getName());}}hasInitPkNames = true;if (!row.isEmpty()) {data.add(row);}if (eventType == CanalEntry.EventType.UPDATE) {Map<String, String> rowOld = new LinkedHashMap<>();for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {if (updateSet.contains(column.getName())) {if (column.getIsNull()) {rowOld.put(column.getName(), null);} else {rowOld.put(column.getName(), column.getValue());}}}// update操作将记录修改前的值old.add(rowOld);}}if (!sqlType.isEmpty()) {flatMessage.setSqlType(sqlType);}if (!mysqlType.isEmpty()) {flatMessage.setMysqlType(mysqlType);}if (!data.isEmpty()) {flatMessage.setData(data);}if (!old.isEmpty()) {flatMessage.setOld(old);}}}return flatMessages;}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue());}}private String compress(FlatMessage flatMessage) {String decompressedJsonString = null;try {String jsonString = JSONUtil.toJsonStr(flatMessage);// 压缩JSON字符串ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);try {gzipOutputStream.write(jsonString.getBytes("UTF-8"));} finally {gzipOutputStream.close();}byte[] compressedData = byteArrayOutputStream.toByteArray();// 解压缩数据并还原为JSON字符串ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);ByteArrayOutputStream decompressedOutputStream = new ByteArrayOutputStream();try {byte[] buffer = new byte[1024];int len;while ((len = gzipInputStream.read(buffer)) != -1) {decompressedOutputStream.write(buffer, 0, len);}} finally {gzipInputStream.close();}decompressedJsonString = decompressedOutputStream.toString("UTF-8");} catch (Exception e) {e.printStackTrace();}return decompressedJsonString;}public static void main(String[] args) throws InterruptedException {CustomCanalClient client = new CustomCanalClient("example", "127.0.0.1", 11111);client.listen();}
}

相关文章:

Canal自定义客户端

一、背景 在Canal推送数据变更信息至MQ&#xff08;消息队列&#xff09;时&#xff0c;我们遇到了特定问题&#xff0c;尤其是当消息体的大小超过了MQ所允许的最大限制。这种限制导致数据推送过程受阻&#xff0c;需要相应的调整或处理。 二、解决方法 采用Canal自定义客户…...

20240621将需要自启动的部分放到RK3588平台的Buildroot系统的rcS文件中

20240621将需要自启动的部分放到RK3588平台的Buildroot系统的rcS文件中 2024/6/21 17:15 开发板&#xff1a;飞凌OK3588-C SDK&#xff1a;Rockchip原厂的Buildroot 缘起&#xff1a;在凌OK3588-C的LINUX R4系统启动的时候&#xff0c;需要拉高GPIO4_B5、GPIO3_B7和GPIO3_D0。…...

掌握数据魔方:Xinstall引领ASA全链路数据归因新纪元

一、引言 在数字化时代&#xff0c;数据是App推广和运营的核心驱动力。然而&#xff0c;如何准确获取、分析并应用这些数据&#xff0c;却成为了许多开发者和营销人员面临的痛点。Xinstall作为一款专业的App全渠道统计服务商&#xff0c;致力于提供精准、高效的数据解决方案&a…...

IIS代理配置-反向代理

前后端分离项目&#xff0c;前端在开发中使用proxy代理解决跨域问题&#xff0c;打包之后无效。 未配置前无法访问 部署环境为windows IIS&#xff0c;要在iis设置反向代理 安装代理模块 需要在iis中实现代理&#xff0c;需要安装Application Request Routing Cache和URL重…...

Flutter调用本地web

前言: 在目前Flutter 环境中&#xff0c;使用在线 webview 是一种很常见的行为 而在 app 环境中&#xff0c;离线使用则更有必要 1.环境准备 将依赖导入 2.引入前端代码 前端代码有两种情况 一种是使用打包工具 build 而来的前端代码 另一种情况是直接使用 HTML 文件 …...

AI大模型部署Ubuntu服务器攻略

一、下载Ollama 在线安装&#xff1a; 在linux中输入命令curl -fsSL https://ollama.com/install.sh | sh 由于在linux下载ollama需要经过外网&#xff0c;网络会不稳定&#xff0c;很容易造成连接超时的问题。 离线安装&#xff1a; 步骤一&#xff1a; 下载Ollama离线版本…...

vlan、vxlan、vpc学习

文章目录 前言VLAN (Virtual Local Area Network)定义工作原理优点应用场景限制 VXLAN (Virtual eXtensible Local Area Network)工作原理优点应用场景与VLAN的区别 VPC (Virtual Private Cloud)定义特点优势应用场景与VLAN/VXLAN的关联 总结 前言 VLAN&#xff08;Virtual Lo…...

低代码开发:加速工业数智化转型发展

引言 在当今全球经济一体化和信息化的深度融合的大环境下&#xff0c;工业数智化转型已经成为推动制造业高质量发展的关键因素。这一转型不仅涉及生产过程的智能化、网络化&#xff0c;还涉及到企业管理、市场服务等全方位的数字化升级&#xff0c;其最终目标是为了实现更高效能…...

python“__main__“的解读

Tutorial Gross tutorial 有些模块包含了仅供脚本使用的代码&#xff0c;比如解析命令行参数或从标准输入获取数据。 如果这样的模块被从不同的模块中导入&#xff0c;例如为了单元测试&#xff0c;脚本代码也会无意中执行。 这就是 if name ‘main’ 代码块的用武之地。除非…...

Linux Debian12使用podman安装pikachu靶场环境

一、pikachu简介 Pikachu是一个带有漏洞的Web应用系统&#xff0c;在这里包含了常见的web安全漏洞。 二、安装podman环境 Linux Debian系统如果没有安装podman容器环境&#xff0c;可以参考这篇文章先安装podman环境&#xff0c; Linux Debian11使用国内源安装Podman环境 三…...

跑通并使用Yolo v5的源代码并进行训练—目标检测

跑通并使用Yolo v5的源代码并进行训练 摘要&#xff1a;yolo作为目标检测计算机视觉领域的核心网络模型&#xff0c;虽然到24年已经出到了v10的版本&#xff0c;但也很有必要对之前的核心版本v5版本进行进一步的学习。在学习yolo v5的时候因为缺少论文所以要从源代码入手来体验…...

需求虽小但是问题很多,浅谈JavaScript导出excel文件

最近我在进行一些前端小开发&#xff0c;遇到了一个小需求&#xff1a;我想要将数据导出到 Excel 文件&#xff0c;并希望能够封装成一个函数来实现。这个函数需要接收一个二维数组作为参数&#xff0c;数组的第一行是表头。在导出的过程中&#xff0c;要能够确保避免出现中文乱…...

phar反序列化及绕过

目录 一、什么是phar phar://伪协议格式&#xff1a; 二、phar结构 1.stub phar&#xff1a;文件标识。 格式为 xxx; *2、manifest&#xff1a;压缩文件属性等信息&#xff0c;以序列化存 3、contents&#xff1a;压缩文件的内容。 4、signature&#xff1a;签名&#…...

汽车IVI中控开发入门及进阶(三十):视频图像滚动问题分析(imx6+TVP5150+Camera)

前言: DA主控SOC采用imx6,TVP5150作为camera摄像头视频的解码decode芯片,imx6采用linux系统。 关于imx6,请参阅:汽车IVI中控开发入门及进阶(二十九):i.MX6-CSDN博客 Contributor III:...

给PDF添加书签的通解-姜萍同款《偏微分方程》改造手记

背景 网上找了一本姜萍同款的《偏微分方程》&#xff0c;埃文斯&#xff0c;英文版&#xff0c;可惜没有书签&#xff0c;洋洋七百多页&#xff0c;没有书签&#xff0c;怎么读&#xff1f;用福昕编辑器自然能手工一个个加上&#xff0c;可是劳神费力&#xff0c;非程序员所为…...

在寻找电子名片在线制作免费生成?5个软件帮助你快速制作电子名片

在寻找电子名片在线制作免费生成&#xff1f;5个软件帮助你快速制作电子名片 当你需要快速制作电子名片时&#xff0c;有几款免费在线工具可以帮助你实现这个目标。这些工具提供了丰富的设计模板和元素&#xff0c;让你可以轻松地创建个性化、专业水平的电子名片。 1.一键logo…...

Github 2024-06-16 php开源项目日报 Top10

根据Github Trendings的统计,今日(2024-06-16统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目10Livewire: Laravel中构建动态UI组件的全栈框架 创建周期:1818 天开发语言:PHP协议类型:MIT LicenseStar数量:21388 个Fork数量:1…...

docker将容器打包提交为镜像,再打包成tar包

将容器打包成镜像可以通过以下步骤来实现。这里以 Docker 为例&#xff0c;假设你已经安装了 Docker 并且有一个正在运行的容器。 1. 找到正在运行的容器 首先&#xff0c;你需要找到你想要打包成镜像的容器的 ID 或者名字。可以使用以下命令查看所有正在运行的容器&#xff…...

洛阳水利乙级资质企业在水利科技创新中的作用

洛阳水利乙级资质企业在水利科技创新中扮演着重要的角色&#xff0c;其贡献主要体现在以下几个方面&#xff1a; 一、技术引进与研发 引进先进技术&#xff1a;洛阳水利乙级资质企业积极引进国内外先进的水利工程技术和管理经验&#xff0c;结合本地实际情况&#xff0c;形成独…...

Redis-事务-基本操作-在执行阶段出错不会回滚

文章目录 1、Redis事务控制命令2、Redis事务错误处理3、Redis事务错误处理&#xff0c;在执行阶段出错不会回滚 1、Redis事务控制命令 127.0.0.1:6379> keys * (empty array) 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set a1 v1 QUEUED 127.0.0.1:6379(TX)>…...

aws的alb,多个域名绑定多个网站实践

例如首次创建的alb负载均衡只有www.xxx.com 需要添加 负载 test2.xxx.com aws的Route 53产品解析到负载均衡 www.xxx.com 添加CNAME&#xff0c;到负载均衡的dns字段axx test2.xxx.com 添加CNAME&#xff0c;到负载均衡的dns字段axx 主要介绍目标组和规则 创建alb就不介…...

WPF/C#:数据绑定到方法

在WPF Samples中有一个关于数据绑定到方法的Demo&#xff0c;该Demo结构如下&#xff1a; 运行效果如下所示&#xff1a; 来看看是如何实现的。 先来看下MainWindow.xaml中的内容&#xff1a; <Window.Resources><ObjectDataProvider ObjectType"{x:Type local…...

GBDT算法详解

GBDT算法详解 梯度提升决策树&#xff08;Gradient Boosting Decision Trees&#xff0c;GBDT&#xff09;是机器学习中一种强大的集成算法。它通过构建一系列的决策树&#xff0c;并逐步优化模型的预测能力&#xff0c;在各种回归和分类任务中取得了显著的效果。本文将详细介…...

51单片机宏定义的例子

代码 demo.c #include "hardware.h"void delay() {volatile unsigned int n;for(n 0; n < 50000; n); }int main(void) {IO_init();while(1){PINSET(LED);delay();PINCLR(LED);delay();}return 0; }cfg.h #ifndef _CFG_H_ #define _CFG_H_// #define F_CPU …...

香港云服务器怎么处理高并发和突发流量?

处理香港云服务器的高并发和突发流量需要综合考虑多种因素&#xff0c;包括服务器配置优化、负载均衡、缓存策略、CDN加速以及监控和自动化调整等措施。以下是处理高并发和突发流量的一些关键步骤和建议&#xff1a; 1. 优化服务器配置 选择高性能实例&#xff1a;根据预期的并…...

c,c++,qt从入门到地狱

前言 1 你所能用的正与你手写的效率相同2 你不需要为你没有用到的特性付出 (无脑的调用函数or公式的空壳人类请出门右转)c 001 scanf and strcpy "_s"bug? 微软官方说明1 Visual Studio 库中的许多函数、成员函数、函数模板和全局变量已弃用,改用微软新增的强化函数…...

iptables(6)扩展匹配条件--tcp-flags、icmp

简介 前面我们已经介绍了不少的扩展模块,例如multiport、iprange、string、time、connlimit模块,但是在tcp扩展模块中只介绍了tcp扩展模块中的”--sport”与--dport”选项,并没有介绍”--tcp-flags”选项,那么这篇文章,我们就来认识一下tcp扩展模块中的”--tcp-flags”和i…...

C#-Json文件的读写

文章速览 命名空间读取Json核心代码示例 写入Json核心代码示例 坚持记录实属不易&#xff0c;希望友善多金的码友能够随手点一个赞。 共同创建氛围更加良好的开发者社区&#xff01; 谢谢~ 命名空间 using Newtonsoft.Json;读取Json 核心代码 //核心代码using (StreamReader…...

【2023级研究生《人工智能》课程考试说明】

一&#xff0e;试题范围 考试题共包括4道大题&#xff1a; 第一大题&#xff1a;分类和回归----&#xff08;8选1&#xff09; 第二大题&#xff1a;降维和聚类----&#xff08;7选1&#xff09; 第三大题&#xff1a;API调用&#xff08;课程中学习过的所有云平台&#xff09…...

C语言队列操作及其安全问题

在C语言中&#xff0c;队列是一种常用的数据结构&#xff0c;特别适用于嵌入式开发中的任务调度、缓冲区管理等场景。下面是一个简单的循环队列的模板代码&#xff0c;它使用数组来实现队列&#xff0c;并提供了基本的入队&#xff08;enqueue&#xff09;和出队&#xff08;de…...

javaweb可以做网站吗/品牌营销策划方案怎么做

1、打开激活窗口 2、选择 Activate new license with License server &#xff08;用license server 激活&#xff09; 3、在 License sever address 处填入 https://jetlicense.nss.im/ 4、点击 Activate 进行认证 还有一种方法比较麻烦&#xff0c;就不提及了...

物流网站建设费用/关键词优化的作用

【目标检测-网络测试1】RFBNet代码复现及问题解决参考文章&#xff1a; &#xff08;1&#xff09;【目标检测-网络测试1】RFBNet代码复现及问题解决 &#xff08;2&#xff09;https://www.cnblogs.com/fanzhongjie/p/11504691.html &#xff08;3&#xff09;https://www.…...

杭州住房和城乡建设局网站首页/什么是引流推广

ps. 此文旨在纪录本人面试的经历&#xff0c;如有对文中的人或者公司有所冒犯&#xff0c;请海涵&#xff0c;不能接受者请mail我&#xff0c;说明情况&#xff0c;我将删掉相关文章 真的是学无止境&#xff0c;这次主要想总结下&#xff0c;近两次面试遇到的一些很细致的问题吧…...

郑州网站建设推广/今日国内新闻大事20条

查看本章节 查看作业目录 需求说明&#xff1a; 应用客户端和服务端通过 Eclipse 控制台的输入和显示实现简易的聊天功能 实现思路&#xff1a; 创建 Java 项目&#xff0c;在项目中创建服务端类 ChatServerThread 和客户端类 ChatClientThread 创建 Java 项目&#xff0c;在…...

外面网站怎么做/成人职业技术培训学校

参考回答&#xff1a; 1.设置一个生产者消费者队列&#xff0c;作为临界资源 2.初始化n个线程&#xff0c;并让其运行起来&#xff0c;加锁去队列取任务运行 3.当任务队列为空的时候&#xff0c;所有线程阻塞 4.当生产者队列来了一个任务后&#xff0c;先对队列加锁&#xff0…...

网站建设的目标与期望/东莞seo排名收费

Program Size: Code10848 RO-data780 RW-data372 ZI-data868 Code 表示程序代码指令部分 存放在Flash区 RO-data 表示 程序定义的常量 ,存放在Flash区 RW-data 表示 已初始化的全局变量、静态变量&#xff0c;值存放在Flash&#xff0c;变量存放RAM区 ZI-data 表示 未初始化的全…...