maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
相关文章:
maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户 CREATE USER maxwell% IDENTIFIED BY 123456; GRANT ALL ON maxwell.* TO maxwell%; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to maxwell%; 开启mysql binlog a.修改 /etc/my.cnf 配置 log-binmysql-bin # 开启binlog binlog-forma…...
实用软件分享---简单菜谱 0.3版本 几千种美食(安卓)
专栏介绍:本专栏主要分享一些实用的软件(Po Jie版); 声明1:软件不保证时效性;只能保证在写本文时,该软件是可用的;不保证后续时间该软件能一直正常运行;不保证没有bug;如果软件不可用了,我知道后会第一时间在题目上注明(已失效)。介意者请勿订阅。 声明2:本专栏的…...
网络学习(14)|RESTful API设计:构建优雅的Web服务
文章目录 设计原则最佳实践命名与URI设计状态码与响应格式HTTP状态码详解响应格式选择 在当今的互联网世界中,RESTful API已成为构建可扩展、可维护和高性能Web服务的标准。本文将深入探讨RESTful API的设计原则、资源命名与URI设计的最佳实践,以及请求与…...
【开源】APIJSON 框架
简述 APIJSON是一个关于API和JSON的综合技术或框架,一种专为API设计的JSON网络传输协议,以及基于这套协议实现的ORM库。 1. 定义与特点: APIJSON是一种基于接口的JSON传输结构协议,它允许客户端定义任何JSON结构来向服务端发起…...
R语言探索与分析18-基于时间序列的汇率预测
一、研究背景与意义 汇率是指两个国家之间的货币兑换比率,而且在国家与国家的经济交流有着举足轻重的作用。随着经济全球化的不断深入,在整个全球经济体中,汇率还是一个评估国家与国家之间的经济状况和发展水平的一个风向标。汇率的变动会对…...
30岁迷茫?AI赛道,人生新起点
前言 30岁,对于许多人来说,是一个人生的分水岭。在这个年纪,有些人可能已经在某个领域取得了不小的成就,而有些人则可能开始对未来的职业方向感到迷茫。如果你正处于这个阶段,那么你可能会问自己:30岁转行…...
开门预警系统技术规范(简化版)
开门预警系统技术规范(简化版) 1 系统概述2 预警区域3 预警目标4 功能需求5 功能条件6 显示需求7 指标需求1 系统概述 开门预警系统(DOW),在自车停止开门过程中,安装在车辆的传感器(如安装在车辆后保险杆两个角雷达)检测从自车后方接近的目标车(汽车、摩托车等)的相对…...
Django与MySQL:配置数据库的详细步骤
文章目录 Django-MySQL 配置配置完执行数据迁移,如果报错: Error loading MySQLdb module, Django-MySQL 配置 # settings.pyDATABASES {# 默认配置sqlite3数据库# default: {# ENGINE: django.db.backends.sqlite3,# NAME: BASE_DIR / db.sqli…...
GPT-4o short description
GPT-4o,作为OpenAI最新推出的人工智能模型,无疑在人工智能领域掀起了新的波澜。 一、版本间的对比分析 与前一版本GPT-4相比,GPT-4o在多个方面进行了显著的改进和优化。首先,在参数规模上,GPT-4o达到了2000亿个参数&…...
MATLAB 矩阵
创建矩阵直接输入:使用 zeros, ones, eye 函数:使用 rand, randi 函数:使用 diag 函数: 矩阵操作矩阵加法和减法:矩阵乘法:矩阵转置:矩阵求逆:矩阵分解:矩阵大小…...
LED灯的功率以及好的品牌推荐
LED灯的功率选择主要根据使用场景、照明需求以及灯具类型来决定。常见的LED灯功率范围在0.5W到100W之间,不同的功率范围适用于不同的场景。 对于小型照明设备,如小夜灯或手电筒,通常选择0.5W到3W的LED灯,足以满足基本的照明需求。…...
Linux “ 软件管理 “
软件管理 widows 安装 方法一: 双击exe安装包,就可以安装。 用exe安装的软件会破记录到注册表中。 注册会记录安装位置,软件名称。 方法二: 用绿色方式进行安装。 不用写到注册表中,因此无法在开始菜单里面查看和卸…...
【uni-app】申请高德地图key,封装map.js,实现H5、iOS、Android通过getlocation获取地图定位信息
文章目录 map组件基础使用封装map.js,实现定位1、使用第三方地图:高德,申请对应平台key1、申请H5 key2、申请微信小程序 key3、申请android key查看证书详情,可以看到SHA1查看/设置Android包名 4、申请ios key 2、封装map1、lib/m…...
使用rufus做Kali Linux时持久分区大小如何设置
持久分区大小是什么意思: 持久分区大小指的是在U盘安装引导Kali Linux时,为保存Kali修改后的设置(如中文界面显示等)而预留的空间大小。这个空间相当于电脑中的D盘,用于保存修改后的设置。 而剩下的空间则用于安装Kali…...
Java高阶数据结构-----并查集(详解)
目录 🧐一.并查集的基本概念&实例: 🤪二.并查集代码: 😂三:并查集的一些习题: A.省份数量 B.等式方程的可满足性 🧐一.并查集的基本概念&实例: 并查集概念&…...
GitLab教程(三):多人合作场景下如何pull代码和处理冲突
文章目录 1.拉取别人同步的代码到本地的流程2.push冲突发生场景情景模拟简单的解决方法 在这一章中,为了模拟多人合作的场景,我需要一个人分饰两角。 执行git clone xx远端仓库地址 xx文件夹命令,在clone代码时指定本地仓库的文件夹名&#…...
模版偏特化之std::enable_if
1 SFINAE。 2 条件特化。可用作额外的函数参数(不可应用于运算符重载)、返回类型(不可应用于构造函数与析构函数),或类模板或函数模板形参。 函数参数: #include <iostream> #include <type_tra…...
好用的Web数据库管理工具推荐(ChatGPT的推荐)
在现代数据管理和开发中,Web数据库管理工具变得越来越重要。这些工具不仅提供了直观的用户界面,还支持跨平台操作,方便用户在任何地方进行数据库管理。 目录 1. SQLynx 2. phpMyAdmin 3. Adminer 4. DBeaver 5 结论 以下是几款推荐的Web…...
encoding Token和embedding 傻傻分不清楚?
encoding 编码 “encoding” 是一个在计算机科学和人工智能领域广泛使用的术语,它可以指代多种不同的过程和方法。核心就是编码:用某些数字来表示特定的信息。当然你或许会说字符集(Unicode)更理解这种概念,编码更强调这种动态的过程。而字符…...
一个公用的数据状态修改组件
灵感来自于一项重复的工作,下图中,这类禁用启用、审核通过不通过、设计成是什么状态否什么状态的场景很多。每一个都需要单独提供接口。重复工作还蛮大的。于是,基于该组件类捕获组件跳转写了这款通用接口。省时省力。 代码如下:…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
