火山引擎 ByteHouse:ClickHouse 如何保证海量数据一致性
背景
ClickHouse是一个开源的OLAP引擎,不仅被全球开发者广泛使用,在字节各个应用场景中也可以看到它的身影。基于高性能、分布式特点,ClickHouse可以满足大规模数据的分析和查询需求,因此字节研发团队以开源ClickHouse为基础,推出火山引擎云原生数据仓库ByteHouse。
在日常工作中,研发人员经常会遇到业务链路过长,导致流程稳定性和数据一致性难保障的问题,这在分布式、跨服务的场景中更为明显。本篇文章提出针对这一问题的解决思路:在火山引擎ByteHouse中构建轻量级流程引擎,来解决数据一致性问题。
使用轻量级流程引擎可以帮我们使用统一的标准来解决复杂业务链路的编排问题,不仅提高业务代码的可读性和复用性,还能更专注业务核心逻辑的开发,让整体流程更加标准化、规范化。
总结来说,使用流程引擎有以下优势:
轻量级,接入方便,内存操作,性能有保障
易维护,流程配置与业务分离,支持热更新
易扩展,丰富的执行策略及算子支持
大体思路

上图为ByteHouse企业版管理平台功能架构图。从该功能架构图可以看出,ByteHouse核心能力都是依赖ClickHouse集群,对于集群节点多、数据计算量大的业务场景,容易出现节点状态不一致的问题,因此保证ClickHouse集群间的状态一致性是我们的核心诉求。

为了保证数据一致性,ByteHouse提供了以下能力:
event engine: 事件处理中心
workflow engine:轻量级流程引擎
对账系统
保障数据一致性最简单的方式是通过状态机来监听流程执行过程:
首先,将所有的任务请求下发到event engine,由event engine将任务分发对应的handler执行,统一管理所有下发任务的生命周期,并提供异步重试、回滚补偿等功能。流量汇总到event engine以后,会让服务后续的业务扩展更加便捷。
其次,对于比较复杂的任务请求,我们可以下发到workflow engine执行,由workflow生成实例,并编排任务队列,管理流程执行实例的生命周期,统一失败回滚,失败重试。
最后,对于服务不可用等特殊场景产生的脏数据,由对账服务兜底。

架构设计
在流程监控的架构设计中,主要包含以下:
流程管理层:主要负责流程配置的解析初始化,并完成编排策略的工作
策略behavior层:编排执行节点,并下发执行任务到执行器
执行器:管理执行节点执行
执行节点:负责业务具体实现

实现方案
执行节点

流程引擎的核心为“责任链”,按照责任链上的节点顺序依次执行所有任务,所以我们需要的三个基本单元分别为:
request:入参
processlist:流程执行节点list
response:出参
在研发工作中,我们时常会遇到以下问题:
如果同时出现了一个问题,node1、node2、node3之间的数据交互如何实现?
如果node1入参、出参与node2,node3不一样该如何处理?
参数类型不同的node又该如何统一调度?
最简单的处理办法,是让node使用相同的上下文信息,将整个执行node模版化。我们让所有的执行节点node实现相同的接口Delegation,统一使用相同的上下文executionContext作为执行方法的入参。
对于流程中的request和response,我们可以放入executionContext中,让每个执行节点都可以通过上下文操作response。
// Delegation -
type Delegation interface {Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorTryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorCancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorCode() stringType() value.DelegationType
}
执行策略
如果确定好了最小的执行节点,我们需要考虑到,业务场景并不会永远顺序执行node,再返回结果,流程执行过程中跳转、循环、并发执行都是比较常见的操作。考虑不同业务场景复用性,我们在执行节点之上加了一层执行策略,用策略behaivor来重新编排触发执行节点的任务。
下图将流程分成了behavior1和behavior2,分别对应不同的策略。
简单的策略举例:按顺序执行、并发执行、循环执行、条件跳转执行等。
我们可以根据自身业务实际需要定制,后续会有实例介绍。

// ActivityBehavior -
type ActivityBehavior interface {Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorExecute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorLeave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppErrorCode() value.ActivityBehaviorCode
}
策略behavior提供有Enter,Execute,Leave三个接口,Enter负责生成执行节点任务instance,Execute负责编排并触发执行任务instance操作,Leave负责跳转到下一个behavior。
可以看出来策略behaivor的跳转方式类似于链表,不断执行next方法,所以编码过程中需要注意不要出现死循环,小心stackoverflow。
Executor
执行器Executor的主要作用是串联执行策略和执行节点,策略behavior将执行的命令下发给Executor,由Executor对执行节点的触发操作。这里会根据执行节点的type,映射到三种执行节点的执行方式,包含tcc,执行一次,重试多次。
// DelegationExecutor -
type DelegationExecutor interface {execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppErrorpostExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError
}func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError {delegationCode := executionContext.GetExecutionInstance().GetDelegationCode()if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil {logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode))delegationCode = string(value.DefaultDelegation)executionContext.GetExecutionInstance().SetDelegationCode(delegationCode)}return de.dumpExecute(ctx, executionContext, delegationCode)
}func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError {FireEvent(ctx, executionContext, value.ExecutionStart)var err apperror.AppErrordelegation := de.DelegationMap[delegationCode]switch delegation.Type() {case value.TccDelegation:err = tccExecute(ctx, executionContext, delegation)case value.SingleDelegation:err = singleExecute(ctx, executionContext, delegation)case value.RetryDelegation:err = retryExecute(ctx, executionContext, delegation)}if err != nil {logger.Error(ctx, "delegation.Execute_err", zap.Error(err))return apperror.Trace(err)}FireEvent(ctx, executionContext, value.ExecutionEnd)return nil
}
ExecutionContext
ExecutionContext上下文是用来记录了流程执行的所有细节,包含以下:
ProcessEngineConfigurationInterface: 流程定义信息
ExecutionInstanceInterface: 执行节点实例
ActivityInstanceInterface: 执行策略实例
ProcessInstanceInterface: 流程实例
request:入参
response:返回值
为了保证整个流程执行的稳定性,这里除了response之外,所以其他的实例参数都不建议开放写接口,response可以用来存储流程实例执行过程中会产生的变量信息。
对于整个流程的定义ProcessEngineConfiguration,我们可以选择最简单的方式,即在数据库里,将配置信息映射成json字符串。当然也可以选择读取配置文件,只要能满足读取方便,数据不丢即可。
// ExecutionContextInterface -
type ExecutionContextInterface interface {GetProcessEngineConfiguration() ProcessEngineConfigurationInterfaceSetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface)GetExecutionInstance() instance.ExecutionInstanceInterfaceSetExecutionInstance(executionInstance instance.ExecutionInstanceInterface)GetActivityInstance() instance.ActivityInstanceInterfaceSetActivityInstance(activityInstance instance.ActivityInstanceInterface)GetProcessInstance() instance.ProcessInstanceInterfaceSetProcessInstance(processInstance instance.ProcessInstanceInterface)SetNeedPause(needPause bool)IsNeedPause() boolSetActivityIndex(activityIndex int)GetActivityIndex() intSetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode)GetActivityBehaviorCode() value.ActivityBehaviorCodeSetBizUniqueKey(bizUniqueKey string)GetBizUniqueKey() stringGetRequest() map[string]interface{}SetRequest(request map[string]interface{})GetResponse() map[string]stringSetResponse(response map[string]string)AtomicAddResponse(key string, value string)
}
Listener
监听器的主要作用是用来监听流程执行中的重要参数信息。从上述executor接口可以看到fireEvent,它的作用是发送消息event,让listener监听到对应的event类型,完成一些定制化的行为。
类似于面向切面编程,我们可以在执行节点的前后增加定制化的逻辑,如打日志、监听节点执行时间,持久化流程中产生的response信息、增加链路追踪等。
API

最后,我们将上述的内容拼接串联起来,主要提供三个接口:
Start: 启动流程
Signal: 暂停或是异常退出后,继续执行流程
Abort: 强制中断流程
process start(){//1.get and create ProcessEngineConfigurationInterface 解析流程定义//2.create processInstance 创建流程实例//3.create ExecutionContext 创建执行上下文//4. lockstrategy trylock //5. invoke process start processinstance.start()//6. persist processInstance and return//7. lockstrategy unlock
}processinstance start(){// get behavior// behavior enterbehavior.Enter(ctx, executionContext)//behavior executebehavior.Execute(ctx, executionContext)//behavior leavebehavior.Leave(ctx, executionContext)
}
相比于start,signal需要读取执行的细节信息,找到之前失败的执行节点位置,并加载到上下文中,再继续执行。
对于失败节点信息的持久化有两种方式:第一,可以选择在流程执行结束持久化;第二,可以通过listener在每个执行节点结束持久化。具体根据实际业务场景对于性能、数据一致性的要求做出抉择。
并发场景考虑
behavior策略中肯定会出现定制、并发、处理多个执行节点到场景的问题,如果同时修改必定会造成数据错乱。简单的方法推荐使用带锁的容器存储,可以被修改的信息(response),此处使用的是github.com/bytedance/gopkg包里面封装的skipmap。
lockstrategy可以自己定义最适配业务场景的,最简单的方案是redis锁,同时也考虑到系统异常退出后的恢复问题。可以参考redis官网解决特殊情况下的锁异常解决方案:https://redis.io/commands/setnx/
后续的工作
轻量级流程引擎的基本功能到此已经实现,后续的扩展优化可以围绕以下方向进行:
界面化展示,可以将链路执行情况展示出来
策略behavior维度扩展,适配各种业务场景
增加子流程的维度,可以复用原先的执行逻辑
Demo示例
以下为简单的processconfiguration的配置信息,此处使用DefaultBehavior,即同步顺序执行策略。
{"ProcessContentList":[{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"sample1"},{"Code":"sample2"},{"Code":"sample3"}]},{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"sample4"},{"Code":"sample5"}]}]
}

在listener里面加入日志,这样可以追溯出整个流程的执行流程,以便更好的监控整个流程的运行状态。
实际使用
以ClickHouse集群缩容为例:

{"ProcessContentList":[// 查询所有需要重分布的table{"Behavior":"DefaultBehavior",// 顺序执行"DelegationList":[{"Code":"hor_reshard_table_loop" }]},// 遍历所有table进行数据的重分布 {"LoopKey":"reshard_table_loop_key","Behavior":"NonBlockLoopBehavior",// 非阻塞循环处理"DelegationList":[{"Code":"hor_reshard_table"}]},// 进行删除节点操作{"Behavior":"DefaultBehavior","DelegationList":[{"Code":"hor_start_remove_node"},{"Code":"hor_prepare_node_vcloud","PostCode":"hor_rollback_remove_node_vcloud"// 统一失败回滚处理},{"Code":"hor_update_config_vcloud","PostCode":"hor_rollback_remove_node_vcloud"},{"Code":"hor_set_cluster_running","PostCode":"hor_rollback_remove_node_vcloud"},{"Code":"hor_release_node"},{"Code":"hor_callback_bill"}]}]
}
总结
一个流程引擎适配所有的业务场景几乎是不可能,除非接受复杂的方案设计,而第三方流程引擎对于日常的业务开发显得太笨重。轻量级流程引擎则会简化接入方式,减少了过多http请求带来的性能损耗,更加灵活多变,追述问题也变得简单。
在ByteHouse中加入流程引擎的能力,能以较小的代价给业务更多重试的可能性,而不需要反复回滚,特别对于耗时很长的任务,能带来更好用户使用体验。除此之外,流程引擎还能将业务流程模版化,增加接口服务的复用性,使得业务代码的可读性、扩展性得到提升,方便后期维护。
火山引擎云原生数据仓库ByteHouse是火山引擎旗下的一款云原生数据仓库,为用户提供极速分析体验,能够支撑实时数据分析和海量数据离线分析,同时还具备便捷的弹性扩缩容能力,极致分析性能和丰富的企业级特性,助力客户数字化转型。
相关文章:

火山引擎 ByteHouse:ClickHouse 如何保证海量数据一致性
背景 ClickHouse是一个开源的OLAP引擎,不仅被全球开发者广泛使用,在字节各个应用场景中也可以看到它的身影。基于高性能、分布式特点,ClickHouse可以满足大规模数据的分析和查询需求,因此字节研发团队以开源ClickHouse为基础&…...
hashmap使用
hashmap作为dao对象存储数据库数据 list是把每一个数据库的字段都映射了,而hashmap则是唯一id:数据库字段作为key hashmap遍历方式 public class Main {//使用迭代器(Iterator)EntrySetpublic static void main(String[] args) {// 创建并赋…...

Centos7配置国内yum源
目录 备份原系统中的repo文件配置国内开源镜像重新生成yum缓存 备份原系统中的repo文件 cd /etc/yum.repos.d/mkdir repo_bakmv *.repo repo_bak/配置国内开源镜像 到网易和阿里开源镜像站点下载系统对应版本的repo文件 curl -O http://mirrors.aliyun.com/repo/Centos-7.re…...

C#中async/await的线程ID变化情况
一、简单的起步 Console.WriteLine($"主线程开始ID:{Thread.CurrentThread.ManagedThreadId}");//aawait Task.Delay(100);//cConsole.WriteLine($"主线程结束ID:{Environment.CurrentManagedThreadId}");//b 结果: …...

网络安全—黑客技术—自学笔记
目录梗概 一、自学网络安全学习的误区和陷阱 二、学习网络安全的一些前期准备 三、网络安全学习路线 四、学习资料的推荐 想自学网络安全(黑客技术)首先你得了解什么是网络安全!什么是黑客! 网络安全可以基于攻击和防御视角来…...

功夫再高也怕菜刀。多年经验,会独立开发的机器视觉工程师,技术太强,但是找工作能力差劲
功夫再高也怕菜刀,专业的事情交给专业的人去做。 今年7月份中旬的时候,遇到一位老朋友,向我咨询某公司的信息,其实我根本不了解这家公司的情况与实力,向他说了,抱歉,我查下,等我晚上…...
numpy的多项式函数: `poly1d`
Python numpy.poly1d() numpy.poly1d()函数有助于定义一个多项式函数。它使得在多项式上应用 "自然操作 "变得容易。 语法: numpy.poly1d (arr, root, var) 参数 : arr : [array_like] 多项式系数按照幂的递减顺序给出。如果第二个参数(根)被…...

Python灰帽编程——错误异常处理和面向对象
文章目录 1. 错误和异常1.1 基本概念1.1.1 Python 异常 1.2 检测(捕获)异常1.2.1 try except 语句1.2.2 捕获多种异常1.2.3 捕获所有异常 1.3 处理异常1.4 特殊场景1.4.1 with 语句 2. 内网主机存活检测程序2.1 scapy 模块2.1.1 主要功能2.1.2 scapy 安装…...

【20230919】win11无法删除Chrome注册表项
win11无法删除Chrome注册表项 删除以下注册表项发生错误: 计算机\HKEY_LOCAL_MACHINE\SOFTWAR\Google计算机\HKEY_CURRENT_USER\Software\Google 尝试了很多删除注册表方法(例如:编辑remove.reg文件),都不行。 无法…...

TCP/IP客户端和服务器端建立通信过程
客户端和服务器端建立通信过程 使用Qt提供的类进行基于TCP的套接字通信需要用到两个类: QTcpServer:服务器类,用于监听客户端连接以及和客户端建立连接。 QTcpSocket:通信的套接字类,客户端、服务器端都需要使用。服务…...

Python ---使用Fake库向clickhouse造数据小案例
每次insert太麻烦了 先在clickhosue中建表 test_user表 CREATE TABLE dwh.test_user (name String,age Int32,address String,phone String,email String ) ENGINE MergeTree() ORDER BY name; 此时表中暂无数据 用Python脚本来造一些数据 from faker import Faker from c…...

09MyBatisX插件
MyBatisX插件 在真正开发过程中对于一些复杂的SQL和多表联查就需要我们自己去编写代码和SQL语句,这个时候可以使用MyBatisX插件帮助我们简化开发 安装MyBatisX插件: File -> Settings -> Plugins -> 搜索MyBatisx插件搜索安装然后重启IDEA 跳转文件功能 由于一个项…...
使用 Messenger 跨进程通信
什么是Messenger Messenger 也是IPC的方案之一,是基于消息的跨进程通信。基于消息是什么意思?Handler是我们最常用的消息机制,所以 Messenger 对于使用者来说就像是使用 Handler。实际上 Messenger 就是 AIDL 的上层封装而已,它们…...
Spring Cloud Gateway
路由谓词工厂 Route Predicate Factory 1. The After Route Predicate Factory spring:cloud:gateway:routes:- id: after_routeuri: https://example.orgpredicates:- After2017-01-20T17:42:47.789-07:00[America/Denver]# 用日期时间匹配 2. The Before Route Pr…...
JVM 优化技术
文章目录 JVM 优化技术概述方法内联优化说明优点内联条件 栈帧之间数据共享说明优点栈帧之间数据共享条件 JVM 优化技术 概述 JVM常见的优化技术: 方法内联优化。栈帧之间数据共享。 方法内联优化 说明 方法内联(Method Inlining)是JVM…...

【MySQL系列】- MySQL自动备份详解
【MySQL系列】- MySQL自动备份详解 文章目录 【MySQL系列】- MySQL自动备份详解一、需求背景二、Windows mysql自动备份方法2.1 复制date文件夹备份实验备份环境创建bat直接备份脚本 2 .2 mysqldump备份成sql文件创建mysqldump备份脚本 2 .3 利用WinRAR对MySQL数据库进行定时备…...

指针笔试题讲解-----让指针简单易懂(2)
目录 回顾上篇重点 : 一.笔试题 ( 1 ) 二.笔试题 ( 2 ) 科普进制知识点 (1) 二进制 (2) 八进制 (3)十六进制 三.笔试题( 3 ) 四.笔试题( 4 ) 五.笔试题( 5 ) 六.笔试题( …...
使用windbg分析dump文件的方法
https://zhuanlan.zhihu.com/p/613434365 一般操作如下: 准备工作。 打开dump文件。指定符号表文件的路径。指定可执行文件的路径。指定源码文件的路径。在windbg的命令行,输入并执行如下命令 .reload,重新加载前述数据文件。!analyze -v&a…...

【论文阅读 07】Anomaly region detection and localization in metal surface inspection
比较老的一篇论文,金属表面检测中的异常区域检测与定位 总结:提出了一个找模板图的方法,使用SIFT做特征提取,姿态估计看差异有哪些,Hough聚类做描述符筛选,仿射变换可视化匹配图之间的关系…...

SSM - Springboot - MyBatis-Plus 全栈体系(十一)
第二章 SpringFramework 五、Spring AOP 面向切面编程 6. Spring AOP 基于 XML 方式实现(了解) 6.1 准备工作 加入依赖和基于注解的 AOP 时一样。准备代码把测试基于注解功能时的 Java 类复制到新 module 中,去除所有注解。 6.2 配置 Sp…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...

技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...

Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...

解析两阶段提交与三阶段提交的核心差异及MySQL实现方案
引言 在分布式系统的事务处理中,如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议(2PC)通过准备阶段与提交阶段的协调机制,以同步决策模式确保事务原子性。其改进版本三阶段提交协议(3PC…...