maxwell 基于zookeeper的高可用方案
Maxwell版本1.39.2
一: 添加zk的pox文件
<!-- customize HA --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version> </dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.4.0</version> </dependency>
二: 创建zk工具类
在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用
package com.zendesk.maxwell.util;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorUtil {private final String zookeeperServers;private final int sessionTimeoutMs;private final int connectionTimeoutMs;private final int baseSleepTimeMs;private final int maxRetries;private CuratorFramework client;public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,int maxRetries) {this.zookeeperServers = zookeeperServers;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.baseSleepTimeMs = baseSleepTimeMs;this.maxRetries = maxRetries;}/** 构造 zookeeper 客户端,并连接 zookeeper 集群*/public void start() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);client = CuratorFrameworkFactory.newClient(this.zookeeperServers,this.sessionTimeoutMs,this.connectionTimeoutMs,retryPolicy);client.start();}/** 实现分布式锁*/public void highAvailable() {// 1.连接 Zookeeper 客户端this.start();// 2.向 zookeeper 注册自己String lockPath = "/maxwell/ha/lock";InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {// 3.获取锁lock.acquire();// 4.将自己信息注册到 leader 路径String leaderPath = "/maxwell/ha/leader";client.create().withMode(CreateMode.EPHEMERAL).forPath(leaderPath);} catch (Exception e) {e.printStackTrace();}}
}
三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类
3.1 添加属性
// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;
3.2 buildOptionParser 方法添加代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ).withRequiredArg(); parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ).withRequiredArg(); parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ).withRequiredArg(); parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ).withRequiredArg(); parser.accepts( "max_retries", "max retry times" ).withRequiredArg();
3.3 setup 方法添加代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}
四:修改 com.zendesk.maxwell.Maxwell 的main函数
将代码段
if ( config.haMode ) {new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {maxwell.start();
}
全部注释掉,修改为
if ( config.haMode ) {CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);curatorUtil.highAvailable();
}
maxwell.start();
然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误
源码下载地址
五: 启动脚本
5.1 创建配置文件 config.properties
log_level=info
# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
producer=kafka
# *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600
kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0
# alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl
# hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]
5.2 启动脚本编写 startup.sh
#!/bin/bash
single(){
bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
echo -e "\033[32m单机版启动成功\n\033[0m"
}
ha(){
## zookeeper 多个用,分割
bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
echo -e "\033[32m高可用版启动成功\n\033[0m"
}
case "$1" in
'ha')
ha
;;
*)
single
;;
esac
5.2.1 高可用版本启动命令
./startup.sh ha
5.2.2 单机版启动命令
./startup.sh
相关文章:
maxwell 基于zookeeper的高可用方案
Maxwell版本1.39.2 一: 添加zk的pox文件 <!-- customize HA --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version> </dependency>&…...
【JavaScript】match用法 | 正则匹配
match正则匹配 var e "www.apple.com:baidu.com" var match e.match(/com/g) console.log("match: "match);> "match: com,com"match返回值问题 match的返回值是一个数组 数组的第0个元素是与整个正则表达式匹配的结果 数组的第1个元素是…...
前端css + js +vue +element-ui 实现响应式布局,根据浏览器窗体大小自动响应
前端css js vue element-ui 实现响应式布局,根据浏览器窗体大小自动响应 1、环境2、js代码3、代码解释1、定义对象2、定义方法3、监听窗口变化,计算比例值,并赋值给transform 属性4、实现监听 3、html 代码4、特别注意 1、环境 我的环境是e…...
小程序生成App:轻量低门槛的开发方式
小程序生成App可以成为一种轻量低门槛的开发App的方式,但是需要根据具体情况进行选择。如果应用需要处理大量数据或需要进行复杂计算,或者需要实现原生特有的功能或交互效果,可能需要选择其他开发方式。 在文章开始之前,我们看看目…...
Linux命名管道进程通信
文章目录 前言一、什么是命名管道通信二、创建方式三、代码示例四、文件进程通信总结 前言 命名管道 是实现进程间通信的强大工具,它提供了一种简单而有效的方式,允许不同进程之间进行可靠的数据交换。不仅可以在同一主机上的不相关进程间进行通信&…...
如何将苹果彻底删除视频找回?试试这3种方法
如今是短视频时代,大家通常会使用苹果手机来拍摄视频,以此记录生活中的美好日常。但是大家都知道视频是十分占空间的,这也经常会出现iPhone内存不足,磁盘崩溃的问题。 当遇到iPhone内存不足的情况时,大家往往会选择清…...
【音视频、chatGpt】h5页面最小化后,再激活后视频停住问题的解决
目录 现象 观察 解决 现象 页面有时候要切换,要最小化;短时间或者几个小时内切换回来,视频可以正常续上;而放置较长时间,几个小时或者一晚上,切换回来后,视频可能卡死 观察 切换页面&#x…...
[CSS] 图片九宫格
效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"/><meta http-equiv"X-UA-Compatible" content"IEedge"/><meta name"viewport" content"widthdevice-…...
MChat-Gpt V1.0.0 (将ChatGpt机器人接入内网供全体使用)
Github>https://github.com/MartinxMax/MChat-Gpt 首页 MChat-Gpt V1.0.0将ChatGpt机器人接入内网供全体使用 你需要一个ChatGpt账户如果您在中国则需要使用代理访问,设置TUN代理模式 安装依赖 选择你的系统进行安装 服务端配置 #python3 ChatGpt_Server.py -h 使用&a…...
日常开发中Git命令指北
Git基本操作 创建化仓库 mkdir 目录 cd 目录 git init配置本地仓库 # 配置用户名,邮箱 git config user.name "cxf" git config user.email "1969612859qq.com" # 查看本地配置(小写的 L) git config -l # 重置配置&a…...
API 测试 | 了解 API 接口概念|电商平台 API 接口测试指南
什么是 API? API 是一个缩写,它代表了一个 pplication P AGC 软件覆盖整个房间。API 是用于构建软件应用程序的一组例程,协议和工具。API 指定一个软件程序应如何与其他软件程序进行交互。 例行程序:执行特定任务的程序。例程也称…...
【计算机组成原理】24王道考研笔记——第三章 存储系统
第三章 存储系统 一、存储系统概述 现代计算机的结构: 1.存储器的层次结构 2.存储器的分类 按层次: 按介质: 按存储方式: 按信息的可更改性: 按信息的可保存性: 3.存储器的性能指标 二、主存储器 1.基本…...
学习C语言的好处:
基础编程语言:C语言是其他编程语言的基础,学习C语言可为后续学习打下坚实基础,广泛应用于嵌入式系统、操作系统、网络协议等。 简单易学:C语言语法简单易懂,适合初学者。只需文本编辑器和编译器,即可开始编…...
基于k8s的devOps自动化运维平台架构设计(中英文版本)
▲ 点击上方"DevOps和k8s全栈技术"关注公众号 In the rapidly evolving landscape of software development and IT operations, DevOps has emerged as a transformative approach to bridge the gap between development and operations teams. One of the key ena…...
P450进阶款无人机室内定位功能研测
在以往的Prometheus 450(P450)无人机上,我们搭载的是Intel Realsense T265定位模块,使用USB连接方式挂载到机载计算机allspark上,通过机载上SDK驱动T265运行并输出SLAM信息,以此来实现室内定位功能。 为进…...
深度学习,计算机视觉任务
目录 计算机视觉任务 1.K近邻算法 2.得分函数 3.损失函数的作用 4.向前传播整体流程 5.反向传播计算方法 计算机视觉任务 机器学习的流程: 数据获取 特征工程 建立模型 评估与应用 计算机视觉: 图像表示:计算机眼中的图像&#…...
使用 Docker 部署 canal 服务实现MySQL和ES实时同步
文章目录 0. 环境介绍0. 前置步骤1. 安装Kibana和Elasticsearch2. 安装Canal和Canal Adapter2.1 修改数据库配置2.1.1 修改配置2.1.2 验证mysql binlog配置2.1.3 查看日志文件2.1.4 用JDBC代码插入数据库 2.2 安装Canal Server2.3 安装Canal Adapter修改两处配置文件配置文件取…...
const易错详解
const对比 常量指针 int b; (1)const int *a &b;//常量指针(2)int const *a &b; //常量指针常量指针:指向的变量值不能被修改 的获取与展示,已难以满足市场对个性化、智能…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
VisualXML全新升级 | 新增数据库编辑功能
VisualXML是一个功能强大的网络总线设计工具,专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑(如DBC、LDF、ARXML、HEX等),并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...
React父子组件通信:Props怎么用?如何从父组件向子组件传递数据?
系列回顾: 在上一篇《React核心概念:State是什么?》中,我们学习了如何使用useState让一个组件拥有自己的内部数据(State),并通过一个计数器案例,实现了组件的自我更新。这很棒&#…...
