当前位置: 首页 > news >正文

maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2

一: 添加zk的pox文件

<!-- customize HA -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.4.0</version>
</dependency>

二: 创建zk工具类

在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用

package com.zendesk.maxwell.util;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorUtil {private final String zookeeperServers;private final int sessionTimeoutMs;private final int connectionTimeoutMs;private final int baseSleepTimeMs;private final int maxRetries;private CuratorFramework client;public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,int maxRetries) {this.zookeeperServers = zookeeperServers;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.baseSleepTimeMs = baseSleepTimeMs;this.maxRetries = maxRetries;}/** 构造 zookeeper 客户端,并连接 zookeeper 集群*/public void start() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);client = CuratorFrameworkFactory.newClient(this.zookeeperServers,this.sessionTimeoutMs,this.connectionTimeoutMs,retryPolicy);client.start();}/** 实现分布式锁*/public void highAvailable() {// 1.连接 Zookeeper 客户端this.start();// 2.向 zookeeper 注册自己String lockPath = "/maxwell/ha/lock";InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {// 3.获取锁lock.acquire();// 4.将自己信息注册到 leader 路径String leaderPath = "/maxwell/ha/leader";client.create().withMode(CreateMode.EPHEMERAL).forPath(leaderPath);} catch (Exception e) {e.printStackTrace();}}
}

三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类

3.1 添加属性

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

3.2 buildOptionParser 方法添加代码

parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ).withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ).withRequiredArg();
parser.accepts( "max_retries", "max retry times" ).withRequiredArg();

3.3 setup 方法添加代码

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}

四:修改 com.zendesk.maxwell.Maxwell 的main函数

将代码段

if ( config.haMode ) {new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {maxwell.start();
}

全部注释掉,修改为

if ( config.haMode ) {CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);curatorUtil.highAvailable();
}
maxwell.start();

然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误

源码下载地址

五: 启动脚本

5.1 创建配置文件 config.properties

log_level=info

# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

producer=kafka

#       *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600

kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

5.2 启动脚本编写 startup.sh

#!/bin/bash

single(){
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
  echo -e "\033[32m单机版启动成功\n\033[0m"
}

ha(){
  ## zookeeper 多个用,分割
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
  echo -e "\033[32m高可用版启动成功\n\033[0m"
}

case "$1" in
  'ha')
     ha
     ;;
  *)
     single
     ;;
esac

5.2.1 高可用版本启动命令

./startup.sh ha

5.2.2 单机版启动命令

./startup.sh


 

相关文章:

maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2 一&#xff1a; 添加zk的pox文件 <!-- customize HA --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version> </dependency>&…...

【JavaScript】match用法 | 正则匹配

match正则匹配 var e "www.apple.com:baidu.com" var match e.match(/com/g) console.log("match: "match);> "match: com,com"match返回值问题 match的返回值是一个数组 数组的第0个元素是与整个正则表达式匹配的结果 数组的第1个元素是…...

前端css + js +vue +element-ui 实现响应式布局,根据浏览器窗体大小自动响应

前端css js vue element-ui 实现响应式布局&#xff0c;根据浏览器窗体大小自动响应 1、环境2、js代码3、代码解释1、定义对象2、定义方法3、监听窗口变化&#xff0c;计算比例值&#xff0c;并赋值给transform 属性4、实现监听 3、html 代码4、特别注意 1、环境 我的环境是e…...

小程序生成App:轻量低门槛的开发方式

小程序生成App可以成为一种轻量低门槛的开发App的方式&#xff0c;但是需要根据具体情况进行选择。如果应用需要处理大量数据或需要进行复杂计算&#xff0c;或者需要实现原生特有的功能或交互效果&#xff0c;可能需要选择其他开发方式。 在文章开始之前&#xff0c;我们看看目…...

Linux命名管道进程通信

文章目录 前言一、什么是命名管道通信二、创建方式三、代码示例四、文件进程通信总结 前言 命名管道 是实现进程间通信的强大工具&#xff0c;它提供了一种简单而有效的方式&#xff0c;允许不同进程之间进行可靠的数据交换。不仅可以在同一主机上的不相关进程间进行通信&…...

如何将苹果彻底删除视频找回?试试这3种方法

如今是短视频时代&#xff0c;大家通常会使用苹果手机来拍摄视频&#xff0c;以此记录生活中的美好日常。但是大家都知道视频是十分占空间的&#xff0c;这也经常会出现iPhone内存不足&#xff0c;磁盘崩溃的问题。 当遇到iPhone内存不足的情况时&#xff0c;大家往往会选择清…...

【音视频、chatGpt】h5页面最小化后,再激活后视频停住问题的解决

目录 现象 观察 解决 现象 页面有时候要切换&#xff0c;要最小化&#xff1b;短时间或者几个小时内切换回来&#xff0c;视频可以正常续上&#xff1b;而放置较长时间&#xff0c;几个小时或者一晚上&#xff0c;切换回来后&#xff0c;视频可能卡死 观察 切换页面&#x…...

[CSS] 图片九宫格

效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"/><meta http-equiv"X-UA-Compatible" content"IEedge"/><meta name"viewport" content"widthdevice-…...

MChat-Gpt V1.0.0 (将ChatGpt机器人接入内网供全体使用)

Github>https://github.com/MartinxMax/MChat-Gpt 首页 MChat-Gpt V1.0.0将ChatGpt机器人接入内网供全体使用 你需要一个ChatGpt账户如果您在中国则需要使用代理访问,设置TUN代理模式 安装依赖 选择你的系统进行安装 服务端配置 #python3 ChatGpt_Server.py -h 使用&a…...

日常开发中Git命令指北

Git基本操作 创建化仓库 mkdir 目录 cd 目录 git init配置本地仓库 # 配置用户名&#xff0c;邮箱 git config user.name "cxf" git config user.email "1969612859qq.com" # 查看本地配置&#xff08;小写的 L&#xff09; git config -l # 重置配置&a…...

API 测试 | 了解 API 接口概念|电商平台 API 接口测试指南

什么是 API&#xff1f; API 是一个缩写&#xff0c;它代表了一个 pplication P AGC 软件覆盖整个房间。API 是用于构建软件应用程序的一组例程&#xff0c;协议和工具。API 指定一个软件程序应如何与其他软件程序进行交互。 例行程序&#xff1a;执行特定任务的程序。例程也称…...

【计算机组成原理】24王道考研笔记——第三章 存储系统

第三章 存储系统 一、存储系统概述 现代计算机的结构&#xff1a; 1.存储器的层次结构 2.存储器的分类 按层次&#xff1a; 按介质&#xff1a; 按存储方式&#xff1a; 按信息的可更改性&#xff1a; 按信息的可保存性&#xff1a; 3.存储器的性能指标 二、主存储器 1.基本…...

学习C语言的好处:

基础编程语言&#xff1a;C语言是其他编程语言的基础&#xff0c;学习C语言可为后续学习打下坚实基础&#xff0c;广泛应用于嵌入式系统、操作系统、网络协议等。 简单易学&#xff1a;C语言语法简单易懂&#xff0c;适合初学者。只需文本编辑器和编译器&#xff0c;即可开始编…...

基于k8s的devOps自动化运维平台架构设计(中英文版本)

▲ 点击上方"DevOps和k8s全栈技术"关注公众号 In the rapidly evolving landscape of software development and IT operations, DevOps has emerged as a transformative approach to bridge the gap between development and operations teams. One of the key ena…...

P450进阶款无人机室内定位功能研测

在以往的Prometheus 450&#xff08;P450&#xff09;无人机上&#xff0c;我们搭载的是Intel Realsense T265定位模块&#xff0c;使用USB连接方式挂载到机载计算机allspark上&#xff0c;通过机载上SDK驱动T265运行并输出SLAM信息&#xff0c;以此来实现室内定位功能。 为进…...

深度学习,计算机视觉任务

目录 计算机视觉任务 1.K近邻算法 2.得分函数 3.损失函数的作用 4.向前传播整体流程 5.反向传播计算方法 计算机视觉任务 机器学习的流程&#xff1a; 数据获取 特征工程 建立模型 评估与应用 计算机视觉&#xff1a; 图像表示&#xff1a;计算机眼中的图像&#…...

使用 Docker 部署 canal 服务实现MySQL和ES实时同步

文章目录 0. 环境介绍0. 前置步骤1. 安装Kibana和Elasticsearch2. 安装Canal和Canal Adapter2.1 修改数据库配置2.1.1 修改配置2.1.2 验证mysql binlog配置2.1.3 查看日志文件2.1.4 用JDBC代码插入数据库 2.2 安装Canal Server2.3 安装Canal Adapter修改两处配置文件配置文件取…...

const易错详解

const对比 常量指针 int b; (1)const int *a &b;//常量指针(2)int const *a &b; //常量指针常量指针&#xff1a;指向的变量值不能被修改 ![常量指针](https://img-blog.csdnimg.cn/9d795b11eb6d484297ea7cbead28463f.png 指针常量 int b; int* const a&b;…...

网络安全—黑客技术【自学】

一、黑客是什么 原是指热心于计算机技术&#xff0c;水平高超的电脑专家&#xff0c;尤其是程序设计人员。但后来&#xff0c;黑客一词已被用于泛指那些专门利用电脑网络搞破坏或者恶作剧的家伙。 二、学习黑客技术的原因 其实&#xff0c;网络信息空间安全已经成为海陆空之…...

作为数据产品经理的一天

数据产品经理作为这两年大数据行业的热门职业&#xff0c;经常有小伙伴会问我数据产品经理是做什么的&#xff0c;给大家简单讲下作为数据产品经理的一天是怎么度过得&#xff0c;算是一篇记录文吧&#xff0c;看完或许大家对这个职业的了解会更深入一些。 01 早上10点&#…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...