当前位置: 首页 > news >正文

【ClickHouse源码】物化视图的写入过程

本文对 ClickHouse 物化视图的写入流程源码做个详细说明,基于 v22.8.14.53-lts 版本。

StorageMaterializedView

首先来看物化视图的构造函数:

StorageMaterializedView::StorageMaterializedView(const StorageID & table_id_,ContextPtr local_context,const ASTCreateQuery & query,const ColumnsDescription & columns_,bool attach_,const String & comment): IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{StorageInMemoryMetadata storage_metadata;storage_metadata.setColumns(columns_);......if (!has_inner_table){target_table_id = query.to_table_id;}else if (attach_){/// If there is an ATTACH request, then the internal table must already be created.target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);}else{/// We will create a query to create an internal table.auto create_context = Context::createCopy(local_context);auto manual_create_query = std::make_shared<ASTCreateQuery>();manual_create_query->setDatabase(getStorageID().database_name);manual_create_query->setTable(generateInnerTableName(getStorageID()));manual_create_query->uuid = query.to_inner_uuid;auto new_columns_list = std::make_shared<ASTColumns>();new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());manual_create_query->set(manual_create_query->columns_list, new_columns_list);manual_create_query->set(manual_create_query->storage, query.storage->ptr());InterpreterCreateQuery create_interpreter(manual_create_query, create_context);create_interpreter.setInternal(true);create_interpreter.execute();target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();}
}

通过以上代码可以发现物化视图支持几种创建语法,总的来说可以归为 3 类:

  1. 指定了目的表的情况:

    create table src(id Int32) Engine=Memory();
    create table dest(id Int32) Engine=Memory();create materialized view mv to dest as select * from src;
    

    使用以上形式时,target_table_id 会选择 dest 表的 table_id

  2. 不指定目的表的情况:

    create table src(id Int32) Engine=Memory();create materialized view mv Engine=Memory() as select * from src;
    

    使用以上形式时,首先会根据源表的 table_id 生成一个以 .inner. 开头的目的表名,如 .inner.5ef4ec2c-efb1-4918-bf6c-59de2edb54cf,然后在生成一个随机的 uuid 作为目的表的 table_id 并同时作为 target_table_id

  3. 第 3 种其实不是创建语法,而是在 ClickHouse 启动或者物化视图被 detach 掉后,执行 attach 的实现。

StorageMaterializedView::read

void StorageMaterializedView::read(QueryPlan & query_plan,const Names & column_names,const StorageSnapshotPtr & storage_snapshot,SelectQueryInfo & query_info,ContextPtr local_context,QueryProcessingStage::Enum processed_stage,const size_t max_block_size,const size_t num_streams)
{/// 获取目的表实例auto storage = getTargetTable();auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);if (query_info.order_optimizer)query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);if (query_plan.isInitialized()){/// 获取物化视图 stream 中对应的 block 结构auto mv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);/// 获取查询语句中所需的列对应的 block 结构auto target_header = query_plan.getCurrentDataStream().header;/// 从查询的列中去除那些mv不存在的列removeNonCommonColumns(mv_header, target_header);/// 分布式表引擎在查询处理到指定阶段,header 中可能不包含物化视图中的所有列,例如 group by/// 所以从 mv_header 中去除那些查询不需要的列removeNonCommonColumns(target_header, mv_header);/// 当查询中得到的 mv_header 和 target_header 有不同结构时,会通过在 pipeline 中添加表达式计算来进行转换/// 比如 Decimal(38, 6) -> Decimal(16, 6),或者一些聚合运算,如 sum 等if (!blocksHaveEqualStructure(mv_header, target_header)){auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),mv_header.getColumnsWithTypeAndName(),ActionsDAG::MatchColumnsMode::Name);auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);converting_step->setStepDescription("Convert target table structure to MaterializedView structure");query_plan.addStep(std::move(converting_step));}query_plan.addStorageHolder(storage);query_plan.addTableLock(std::move(lock));}
}

通过以上代码可以看出,物化视图是一种逻辑描述,数据都是存储在目的表中,读取时实际操作的目的表,并且在在查询过程中还会涉及到多阶段 block 的转换,以及表达式的计算。

StorageMaterializedView::write

SinkToStoragePtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr local_context)
{auto storage = getTargetTable();auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);auto metadata_snapshot = storage->getInMemoryMetadataPtr();auto sink = storage->write(query, metadata_snapshot, local_context);sink->addTableLock(lock);return sink;
}

同样写也是将数据存入了目的表。


我们都知道数据写源表时会触发写物化视图,从而将数据写入目的表,下面就看一下是如何实现的。SQL 的执行都是通过 IInterpreterInterpreterXxx 的,这里就不再多说,一个写入操作最中会调用 InterpreterInsertQuery,所以从 InterpreterInsertQuery::execute() 开始跟踪。

InterpreterInsertQuery::execute()

BlockIO InterpreterInsertQuery::execute()
{......std::vector<Chain> out_chains;if (!distributed_pipeline || query.watch){size_t out_streams_size = 1;......for (size_t i = 0; i < out_streams_size; ++i){auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr);out_chains.emplace_back(std::move(out));}}......
}

execute() 中通过 buildChainImpl() 来构建输出链, buildChainImpl() 会判断当前表是否有物化视图关联,如果有就会调用 buildPushingToViewsChain()

buildPushingToViewsChain()

这个方法非常长,这里只展示和本文想说明的问题相关的部分。

Chain buildPushingToViewsChain(const StoragePtr & storage,const StorageMetadataPtr & metadata_snapshot,ContextPtr context,const ASTPtr & query_ptr,bool no_destination,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms,const Block & live_view_header)
{......auto table_id = storage->getStorageID();auto views = DatabaseCatalog::instance().getDependentViews(table_id);......std::vector<Chain> chains;for (const auto & view_id : views){auto view = DatabaseCatalog::instance().tryGetTable(view_id, context);......if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get())){......StoragePtr inner_table = materialized_view->getTargetTable();auto inner_table_id = inner_table->getStorageID();auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();query = view_metadata_snapshot->getSelectQuery().inner_query;target_name = inner_table_id.getFullTableName();Block header;/// Get list of columns we get from select query.if (select_context->getSettingsRef().allow_experimental_analyzer)header = InterpreterSelectQueryAnalyzer::getSampleBlock(query, select_context);elseheader = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze()).getSampleBlock();/// Insert only columns returned by select.Names insert_columns;const auto & inner_table_columns = inner_metadata_snapshot->getColumns();for (const auto & column : header){/// But skip columns which storage doesn't have.if (inner_table_columns.hasPhysical(column.name))insert_columns.emplace_back(column.name);}InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);out.addStorageHolder(view);out.addStorageHolder(inner_table);}else if (auto * live_view = dynamic_cast<StorageLiveView *>(view.get())){runtime_stats->type = QueryViewsLogElement::ViewType::LIVE;query = live_view->getInnerQuery(); // Used only to log in system.query_views_logout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms, storage_header);}else if (auto * window_view = dynamic_cast<StorageWindowView *>(view.get())){runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;query = window_view->getMergeableQuery(); // Used only to log in system.query_views_logout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms);}elseout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), false, thread_status_holder, view_counter_ms);......
}

buildPushingToViewsChain() 会检查当前表是否有视图依赖,通过几个判断可以看出视图分为三种:物化视图、实时视图和窗口视图,最后的 else 是指当前表是个普通表。如果当前表是源表且有物化视图依赖,就会调用 buildPushingToViewsChain() 来构建链,这是个递归调用,首次进入当前表是普通表,其依赖的物化视图会再次调用该方法,再次进入就会物化视图的 if 逻辑,最终是通过 buildChain() 来构建链。

buildChainImpl

buildChain() 中是调用了 buildChainImpl() 这个实现类。

Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr & table,const StorageMetadataPtr & metadata_snapshot,const Block & query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{....../// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.///       Otherwise we'll get duplicates when MV reads same rows again from Kafka.if (table->noPushingToViews() && !no_destination){auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);sink->setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else{out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}......
}

buildChainImpl() 会根据当前表(或视图)是否有依赖的视图或目的表,来做不同的操作,这里就可以处理视图级连视图的情况,会不断递归构造相应的链节点,使之连接起来。

Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr & table,const StorageMetadataPtr & metadata_snapshot,const Block & query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{.../// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.///       Otherwise we'll get duplicates when MV reads same rows again from Kafka.if (table->noPushingToViews() && !no_destination)  // table->noPushingToViews() 用于禁止物化视图插入数据到 KafkaEngine{auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);sink->setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else  // 构建物化视图插入 pushingToViewChain,重点!!!{out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}...return out;
}

小结

所以源表和物化视图在写入时是构造了多个输出链,数据也是只能对当前写入的数据做操作,不会影响源表现有数据。而且写入源表和目的表的过程是一个 pipeline,需要全部完成才算写入成功,当然 pipeline 可以并行处理,可以加快写入速度。


欢迎添加微信:xiedeyantu,讨论技术问题。

相关文章:

【ClickHouse源码】物化视图的写入过程

本文对 ClickHouse 物化视图的写入流程源码做个详细说明&#xff0c;基于 v22.8.14.53-lts 版本。 StorageMaterializedView 首先来看物化视图的构造函数&#xff1a; StorageMaterializedView::StorageMaterializedView(const StorageID & table_id_,ContextPtr local_…...

.NET 使用NLog增强日志输出

引言 不管你是开发单体应用还是微服务应用&#xff0c;在实际的软件的开发、测试和运行阶段&#xff0c;开发者都需要借助日志来定位问题。因此一款好的日志组件将至关重要&#xff0c;在.NET 的开源生态中&#xff0c;目前主要有Serilog、Log4Net和NLog三款优秀的日志组件&…...

一道阿里类的初始化顺序笔试题

问题很简单&#xff0c;就是下面的代码打印出什么&#xff1f; public class InitializeDemo {private static int k 1;private static InitializeDemo t1 new InitializeDemo("t1" );private static InitializeDemo t2 new InitializeDemo("t2");priv…...

cuda找不到路径报错

编译C文件时出现&#xff1a;error: [Errno 2] No such file or directory: :/usr/local/cuda:/usr/local/cuda/bin/nvcc 在终端输入&#xff1a; export CUDA_HOME/usr/local/cuda...

Elasticsearch进阶之(核心概念、系统架构、路由计算、倒排索引、分词、Kibana)

Elasticsearch进阶之&#xff08;核心概念、系统架构、路由计算、倒排索引、分词、Kibana&#xff09; 1、核心概念&#xff1a; 1.1、索引&#xff08;Index&#xff09; 一个索引就是一个拥有几分相似特征的文档的集合。比如说&#xff0c;你可以有一个客户数据的索引&…...

Android包体积缩减

关于减小包体积的方案&#xff1a; 一、所有的图片压缩&#xff0c;采用webp 格式。 &#xff08;当然有些图片采用webp格式反而变大了&#xff0c;可以仍采用png格式&#xff09; 二、语音资源过滤 只保留中文 resConfigs "zh-rCN", "zh” 可以减少resourc…...

【华为OD机试】 网上商城优惠活动(C++ Java Javascript Python)

文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 某网上商场举办优惠活动,发布了满减、打折、无门槛3种优惠券,分别为: 每满100元优惠10元,无使用数限制,如100199元可以使用1张减10元,200299可使用2张减20元,以此类推;92折券,1次…...

GWT安装过程

1:安装前准备 &#xff08;可以问我要&#xff09; appengine-java-sdk-1.9.8 com.google.gdt.eclipse.suite.4.3.update.site_3.8.0 gwt-2.5.1 eclipse-jee-kepler-SR2-win32-x86_64.zip 2&#xff1a;安装环境上 打开eclipse Help –Install New Software… 选择Add –…...

代码随想录算法训练营第一天| 704. 二分查找、27. 移除元素

Leetcode 704 二分查找题目链接&#xff1a;704二分查找介绍给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。思路先看看一个…...

office@word@ppt启用mathtype组件方法整理

文章目录将mathtype添加到word中ref查看office安装路径文件操作法Note附PPT中使用mathtype将mathtype添加到word中 先安装office,再安装mathtype,那么这个过程是自动的如果是先安装mathtype,再安装office,那么有以下选择: 重新安装一遍mathtype(比较简单,不需要说明)执行文件操…...

计算机大小端

我们先假定内存结构为上下型的&#xff0c;上代表内存高地址&#xff0c;下代表内存低地址。 电脑读取内存数据时&#xff0c;是从低位地址到高位地址进行读取&#xff08;从下到上&#xff09;。 1、何为大小端 大端&#xff1a;数据的高位字节存放在低地址&#xff0c;数据…...

Matplotlib绘图从零入门到实践(含各类用法详解)

一、引入 Matplotlib 是一个Python的综合库&#xff0c;用于在 Python 中创建静态、动画和交互式可视化。 本教程包含笔者在使用Matplotlib库过程中遇到的各类完整实例与用法还有遇到的库理论问题&#xff0c;可以根据自己的需要在目录中查询对应的用法、实例以及第四部分关于…...

C语言 入门教程||C语言 指针||C语言 字符串

C语言 指针 学习 C 语言的指针既简单又有趣。通过指针&#xff0c;可以简化一些 C 编程任务的执行&#xff0c;还有一些任务&#xff0c;如动态内存分配&#xff0c;没有指针是无法执行的。所以&#xff0c;想要成为一名优秀的 C 程序员&#xff0c;学习指针是很有必要的。 …...

Nacos2.x+Nginx集群配置

一、配置 nacos 集群 注意&#xff1a;需要先配置好 nacos 连接本地数据库 1、拷贝三份 nacos 2、修改配置文件&#xff08;cluster.conf&#xff09; 修改启动端口&#xff1a; nacos1&#xff1a;8818 nacos2&#xff1a;8828 nacos3&#xff1a;8838 当nacos客户端升级为…...

Android源码分析 - InputManagerService与触摸事件

0. 前言 有人问到&#xff1a;“通过TouchEvent&#xff0c;你可以获得到当前的触点&#xff0c;它更新的频率和屏幕刷新的频率一样吗&#xff1f;”。听到这个问题的时候我感到很诧异&#xff0c;我们知道Android是事件驱动机制的设计&#xff0c;可以从多种服务中通过IPC通信…...

python库--urllib

目录 一.urllib导入 二.urllib爬取网页 三.Headers属性 1.使用build_opener()修改报头 2.使用add_header()添加报头 四.超时设置 五.get和post请求 1.get请求 2.post请求 urllib库和request库作用差不多&#xff0c;但比较起来request库更加容易上手&#xff0c;但该了…...

美团前端二面常考react面试题及答案

什么原因会促使你脱离 create-react-app 的依赖 当你想去配置 webpack 或 babel presets。 React 16中新生命周期有哪些 关于 React16 开始应用的新生命周期&#xff1a; 可以看出&#xff0c;React16 自上而下地对生命周期做了另一种维度的解读&#xff1a; Render 阶段&a…...

环境搭建04-Ubuntu16.04更改conda,pip的镜像源

我常用的pipy国内镜像源&#xff1a; https://pypi.tuna.tsinghua.edu.cn/simple # 清华 http://mirrors.aliyun.com/pypi/simple/ # 阿里云 https://pypi.mirrors.ustc.edu.cn/simple/ #中国科技大学1、将conda的镜像源修改为国内的镜像源 先查看conda安装的信息…...

【C++进阶】四、STL---set和map的介绍和使用

目录 一、关联式容器 二、键值对 三、树形结构的关联式容器 四、set的介绍及使用 4.1 set的介绍 4.2 set的使用 五、multiset的介绍及使用 六、map的介绍和使用 6.1 map的介绍 6.2 map的使用 七、multimap的介绍和使用 一、关联式容器 前面已经接触过 STL 中的部分…...

JavaSE学习进阶 day1_01 static关键字和静态代码块的使用

好的现在我们进入进阶部分的学习&#xff0c;看一张版图&#xff1a; 前面我们已经学习完基础班的内容了&#xff0c;现在我们已经来到了第二板块——基础进阶&#xff0c;这部分内容就不是那么容易了。学完第二板块&#xff0c;慢慢就在向java程序员靠拢了。 面向对象进阶部分…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...