maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
相关文章:
maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户 CREATE USER maxwell% IDENTIFIED BY 123456; GRANT ALL ON maxwell.* TO maxwell%; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to maxwell%; 开启mysql binlog a.修改 /etc/my.cnf 配置 log-binmysql-bin # 开启binlog binlog-forma…...
实用软件分享---简单菜谱 0.3版本 几千种美食(安卓)
专栏介绍:本专栏主要分享一些实用的软件(Po Jie版); 声明1:软件不保证时效性;只能保证在写本文时,该软件是可用的;不保证后续时间该软件能一直正常运行;不保证没有bug;如果软件不可用了,我知道后会第一时间在题目上注明(已失效)。介意者请勿订阅。 声明2:本专栏的…...
网络学习(14)|RESTful API设计:构建优雅的Web服务
文章目录 设计原则最佳实践命名与URI设计状态码与响应格式HTTP状态码详解响应格式选择 在当今的互联网世界中,RESTful API已成为构建可扩展、可维护和高性能Web服务的标准。本文将深入探讨RESTful API的设计原则、资源命名与URI设计的最佳实践,以及请求与…...
【开源】APIJSON 框架
简述 APIJSON是一个关于API和JSON的综合技术或框架,一种专为API设计的JSON网络传输协议,以及基于这套协议实现的ORM库。 1. 定义与特点: APIJSON是一种基于接口的JSON传输结构协议,它允许客户端定义任何JSON结构来向服务端发起…...
R语言探索与分析18-基于时间序列的汇率预测
一、研究背景与意义 汇率是指两个国家之间的货币兑换比率,而且在国家与国家的经济交流有着举足轻重的作用。随着经济全球化的不断深入,在整个全球经济体中,汇率还是一个评估国家与国家之间的经济状况和发展水平的一个风向标。汇率的变动会对…...
30岁迷茫?AI赛道,人生新起点
前言 30岁,对于许多人来说,是一个人生的分水岭。在这个年纪,有些人可能已经在某个领域取得了不小的成就,而有些人则可能开始对未来的职业方向感到迷茫。如果你正处于这个阶段,那么你可能会问自己:30岁转行…...
开门预警系统技术规范(简化版)
开门预警系统技术规范(简化版) 1 系统概述2 预警区域3 预警目标4 功能需求5 功能条件6 显示需求7 指标需求1 系统概述 开门预警系统(DOW),在自车停止开门过程中,安装在车辆的传感器(如安装在车辆后保险杆两个角雷达)检测从自车后方接近的目标车(汽车、摩托车等)的相对…...
Django与MySQL:配置数据库的详细步骤
文章目录 Django-MySQL 配置配置完执行数据迁移,如果报错: Error loading MySQLdb module, Django-MySQL 配置 # settings.pyDATABASES {# 默认配置sqlite3数据库# default: {# ENGINE: django.db.backends.sqlite3,# NAME: BASE_DIR / db.sqli…...
GPT-4o short description
GPT-4o,作为OpenAI最新推出的人工智能模型,无疑在人工智能领域掀起了新的波澜。 一、版本间的对比分析 与前一版本GPT-4相比,GPT-4o在多个方面进行了显著的改进和优化。首先,在参数规模上,GPT-4o达到了2000亿个参数&…...
MATLAB 矩阵
创建矩阵直接输入:使用 zeros, ones, eye 函数:使用 rand, randi 函数:使用 diag 函数: 矩阵操作矩阵加法和减法:矩阵乘法:矩阵转置:矩阵求逆:矩阵分解:矩阵大小…...
LED灯的功率以及好的品牌推荐
LED灯的功率选择主要根据使用场景、照明需求以及灯具类型来决定。常见的LED灯功率范围在0.5W到100W之间,不同的功率范围适用于不同的场景。 对于小型照明设备,如小夜灯或手电筒,通常选择0.5W到3W的LED灯,足以满足基本的照明需求。…...
Linux “ 软件管理 “
软件管理 widows 安装 方法一: 双击exe安装包,就可以安装。 用exe安装的软件会破记录到注册表中。 注册会记录安装位置,软件名称。 方法二: 用绿色方式进行安装。 不用写到注册表中,因此无法在开始菜单里面查看和卸…...
【uni-app】申请高德地图key,封装map.js,实现H5、iOS、Android通过getlocation获取地图定位信息
文章目录 map组件基础使用封装map.js,实现定位1、使用第三方地图:高德,申请对应平台key1、申请H5 key2、申请微信小程序 key3、申请android key查看证书详情,可以看到SHA1查看/设置Android包名 4、申请ios key 2、封装map1、lib/m…...
使用rufus做Kali Linux时持久分区大小如何设置
持久分区大小是什么意思: 持久分区大小指的是在U盘安装引导Kali Linux时,为保存Kali修改后的设置(如中文界面显示等)而预留的空间大小。这个空间相当于电脑中的D盘,用于保存修改后的设置。 而剩下的空间则用于安装Kali…...
Java高阶数据结构-----并查集(详解)
目录 🧐一.并查集的基本概念&实例: 🤪二.并查集代码: 😂三:并查集的一些习题: A.省份数量 B.等式方程的可满足性 🧐一.并查集的基本概念&实例: 并查集概念&…...
GitLab教程(三):多人合作场景下如何pull代码和处理冲突
文章目录 1.拉取别人同步的代码到本地的流程2.push冲突发生场景情景模拟简单的解决方法 在这一章中,为了模拟多人合作的场景,我需要一个人分饰两角。 执行git clone xx远端仓库地址 xx文件夹命令,在clone代码时指定本地仓库的文件夹名&#…...
模版偏特化之std::enable_if
1 SFINAE。 2 条件特化。可用作额外的函数参数(不可应用于运算符重载)、返回类型(不可应用于构造函数与析构函数),或类模板或函数模板形参。 函数参数: #include <iostream> #include <type_tra…...
好用的Web数据库管理工具推荐(ChatGPT的推荐)
在现代数据管理和开发中,Web数据库管理工具变得越来越重要。这些工具不仅提供了直观的用户界面,还支持跨平台操作,方便用户在任何地方进行数据库管理。 目录 1. SQLynx 2. phpMyAdmin 3. Adminer 4. DBeaver 5 结论 以下是几款推荐的Web…...
encoding Token和embedding 傻傻分不清楚?
encoding 编码 “encoding” 是一个在计算机科学和人工智能领域广泛使用的术语,它可以指代多种不同的过程和方法。核心就是编码:用某些数字来表示特定的信息。当然你或许会说字符集(Unicode)更理解这种概念,编码更强调这种动态的过程。而字符…...
一个公用的数据状态修改组件
灵感来自于一项重复的工作,下图中,这类禁用启用、审核通过不通过、设计成是什么状态否什么状态的场景很多。每一个都需要单独提供接口。重复工作还蛮大的。于是,基于该组件类捕获组件跳转写了这款通用接口。省时省力。 代码如下:…...
[python]yfinance国内不能使用
yfinance国内不能使用,可以使用tushare、akshare代替 import yfinance as yf# 输入股票代码 stock_symbol AAPL # 替换为你想要查询的股票代码# 获取股票数据 data yf.download(stock_symbol)# 打印实时数据 print(data) pip install akshare import akshare …...
Frontiers旗下期刊,23年分区表整理出炉!它还值得投吗?
本周投稿推荐 SSCI • 中科院2区,6.0-7.0(录用友好) EI • 各领域沾边均可(2天录用) CNKI • 7天录用-检索(急录友好) SCI&EI • 4区生物医学类,0.5-1.0(录用…...
基于JSP的毕业生就业信息管理系统
开头语: 你好,我是专注于信息系统开发的学长猫哥。如果您对毕业生就业信息管理或相关技术感兴趣,欢迎联系我交流。 开发语言: JSP 数据库: MySQL 技术: JSP技术 SSM框架 工具: Eclips…...
CDN、CNAME、DNS
CDN、CNAME、DNS 域名解析是将域名转换为IP地址的过程。当用户在浏览器中输入域名时,计算机需要在DNS系统中找到对应的IP地址,以便能够访问该网站。 CDN(Content Delivery Network,内容分发网络)是一种用于加速网站访…...
直播商城源码-PC+APP+H5+小程序现成源码
随着电商行业的不断演进,直播商城已成为连接消费者和商品的新兴桥梁。直播商城源码提供了一个完整的解决方案,使得企业能够迅速搭建起一个覆盖PC、APP、H5和小程序的全渠道电商平台。本文将探讨直播商城源码的优势、关键功能以及如何选择适合的现成源码。…...
16. 《C语言》——【牛客网BC124 —— BC130题目讲解】
亲爱的读者,大家好!我是一名正在学习编程的高校生。在这个博客里,我将和大家一起探讨编程技巧、分享实用工具,并交流学习心得。希望通过我的博客,你能学到有用的知识,提高自己的技能,成为一名优…...
Docker 国内镜像源更换
实现 替换docker 镜像源 前提要求 安装 docker docker-compose 参考创建一键更换docker国内镜像源 Docker 镜像代理DaoCloud 镜像站百度云 https://mirror.baidubce.com南京大学镜像站...
python07
__init__.py from . import p1 from . import p2 # 理解:import p2 先导入 p2 文件, 然后该文件的内容全要 from . # # 告诉调用者,哪些文件需要使用 p1.py def sum(a,b):print(a b) p2.py def max(a,b):if a > b:print(a)else:pri…...
【CTS】android CTS测试
android CTS测试 1.硬件准备2. 软件准备3. 下载 CTS3.1 cts3.2 解压 CTS 包: 4 配置adb fastboot5 检查 Java 版本6 安装aapt26.1 下载并安装 Android SDK6.2 找到 aapt2 工具6.3 配置环境变量 7. 准备测试设备8. 运行 CTS 测试8.1 启动 CTS: 9. 查看测试…...
【雷丰阳-谷粒商城 】【分布式基础篇-全栈开发篇】【08】【商品服务】Object划分_批量删除
持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式基础篇-全栈开发篇】【08】【商品服务】Object划分_批量删除 Object划分批量删除/添加参考 Object划分 数据库中对于一张表的数据,由于拥有隐私字段、多余字段、字段过少等原因,不应该直…...
做网站之前要备案是什么意思/全国教育培训机构平台
Spring框架已经成为Java开发人员的必备知识,而且Spring 3引入了强大的新特性,例如SpEL、Spring 表达式语言、loC 容器的新注解以及用户急需的对REST的支持。无论你是刚刚接触Spring还是被 Spring 3.0的新特性所吸引,这份笔记都是掌握Spring的…...
南京美容网站建设/全渠道营销案例
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。作者:张俊博链接:https://www.zhihu.com/question/22298352/answer/34267457来源:知乎有那么麻烦吗?不推荐用“反转/翻转/反褶/对称”等解释卷积…...
地方政府门户网站的建设/在线建站网页制作网站建设平台
在Windows server 2003上配置DHCP服务<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />实验二DHCP中继代理配置★ 实验环境1.准备三台电脑,一台作为客户机(XP),一台为DH…...
千灯做网站/sem推广
在我之前的分享中提到,我们在家里或者公司内部使用的IP地址是私有地址。这些地址在internet网中也就是公网中是不合法的地址,也就是说你如果使用一个私网地址是不可能登录到internet中浏览信息的。那么为什么我们的终端(电脑、手机࿰…...
欧美最火的社交网站怎么做/谷歌浏览器网址
前言在调用GeoServer服务之前,需了解OGC制订的Web服务类型:WMS、WFS、WCS、WMTS,百度即可,这里与ArcGIS Server发布的服务类型做一个类比:本文以WFS为例进行说明。GeoServer版本 2.13.2WFSLayerArcGIS JS API的示例加载…...
织梦做中英文企业网站/俄罗斯搜索引擎入口
1环境 Win10,ANACONDA3(64-bit),Python3.6.2。ANACONDA Prompt中不能用pip命令安装包,并且是在安装了TensorFlow后才发生的。 TypeError: parse() got an unexpected keyword argument ‘transport_encoding’ 2错误信息 报错如下&#…...