当前位置: 首页 > news >正文

milvus Delete API流程源码分析

Delete API执行流程源码解析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Delete 的数据流向:

在这里插入图片描述

1.客户端sdk发出Delete API请求。

from pymilvus import (connections,Collection,
)print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start delete entities")
## expr = "book_id in [0,1]" 主键
expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
delete_result = hello_milvus.delete(expr)
print(delete_result)

2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {......// request封装为deleteTaskdt := &deleteTask{ctx:         ctx,Condition:   NewTaskCondition(ctx),req:         request,idAllocator: node.rowIDAllocator,chMgr:       node.chMgr,chTicker:    node.chTicker,lb:          node.lbPolicy,}......// 将task压入dmQueue队列// MsgID will be set by Enqueue()if err := node.sched.dmQueue.Enqueue(dt); err != nil {......}......// 等待任务执行完if err := dt.WaitToFinish(); err != nil {......}......
}

DeleteRequest数据结构:

type DeleteRequest struct {Base                 *commonpb.MsgBaseDbName               stringCollectionName       stringPartitionName        stringExpr                 stringHashKeys             []uint32XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_delete.go

func (dt *deleteTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")defer sp.End()log := log.Ctx(ctx)if len(dt.req.GetExpr()) == 0 {return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")}dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)if err != nil {return err}plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)if err != nil {return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())}// 判断走simpleDelete还是complexDeleteisSimple, termExp := getExpr(plan)if isSimple {// if could get delete.primaryKeys from delete exprerr := dt.simpleDelete(ctx, termExp, stream)if err != nil {return err}} else {// if get complex delete expr// need query from querynode before deleteerr = dt.complexDelete(ctx, plan, stream)if err != nil {log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))return err}}return nil
}

expr如果是主键表达式则走simpleDelete,否则走complexDelete。

4.simpleDelete

func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)if err != nil {log.Info("Failed to get primary keys from expr", zap.Error(err))return err}log.Debug("get primary keys from expr",zap.Int64("len of primary keys", numRow),zap.Int64("collectionID", dt.collectionID),zap.Int64("partationID", dt.partitionID))err = dt.produce(ctx, stream, primaryKeys)if err != nil {return err}return nil
}

函数getPrimaryKeysFromExpr()的返回schemapb.IDs。

type IDs struct {// Types that are valid to be assigned to IdField:////	*IDs_IntId//	*IDs_StrIdIdField              isIDs_IdFieldXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

isIDs_IdField是一个接口类型。

type isIDs_IdField interface {isIDs_IdField()
}

isIDs_IdField有2个实现:

  • IDs_IntId
  • IDs_StrId
type IDs_IntId struct {IntId *LongArray
}type LongArray struct {Data                 []int64XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}type IDs_StrId struct {StrId *StringArray
}type StringArray struct {Data                 []stringXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

在这里插入图片描述

从expr提取主键存储到变量primaryKeys。

5.dt.produce()

func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {// 根据vchannels计算hashhashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)// repack delete msg by dmChannelresult := make(map[uint32]msgstream.TsMsg)numRows := int64(0)for index, key := range hashValues {vchannel := dt.vChannels[key]_, ok := result[key]if !ok {// 创建deleteMsgdeleteMsg, err := dt.newDeleteMsg(ctx)if err != nil {return err}deleteMsg.ShardName = vchannelresult[key] = deleteMsg}curMsg := result[key].(*msgstream.DeleteMsg)curMsg.HashValues = append(curMsg.HashValues, hashValues[index])curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)curMsg.NumRows++numRows++}// send delete request to log brokermsgPack := &msgstream.MsgPack{BeginTs: dt.BeginTs(),EndTs:   dt.EndTs(),}// 将deleteMsg包装进msgPackfor _, msg := range result {if msg != nil {msgPack.Msgs = append(msgPack.Msgs, msg)}}log.Debug("send delete request to virtual channels",zap.String("collectionName", dt.req.GetCollectionName()),zap.Int64("collectionID", dt.collectionID),zap.Strings("virtual_channels", dt.vChannels),zap.Int64("taskID", dt.ID()),zap.Duration("prepare duration", dt.tr.RecordSpan()))// 发送给mqerr := stream.Produce(msgPack)if err != nil {return err}dt.result.DeleteCnt += numRowsreturn nil
}

msgstream.TsMsg是一个接口类型。

有如下实现:

  • createCollectionMsg
  • CreateDatabaseMsg
  • CreateIndexMsg
  • createPartitionMsg
  • DataNodeTtMsg
  • DeleteMsg
  • DropCollectionMsg
  • DropDatabaseMsg
  • DropIndexMsg
  • DropPartitionMsg
  • FlushMsg
  • InsertMsg
  • LoadCollectionMsg
  • ReleaseCollectionMsg
  • TimeTickMsg

6.complexDelete

func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {err := dt.lb.Execute(ctx, CollectionWorkLoad{db:             dt.req.GetDbName(),collectionName: dt.req.GetCollectionName(),collectionID:   dt.collectionID,nq:             1,exec:           dt.getStreamingQueryAndDelteFunc(stream, plan),})if err != nil {log.Warn("fail to get or create dml stream", zap.Error(err))return err}return nil
}

最终会执行dt.getStreamingQueryAndDelteFunc。

这个函数会调用:

dt.produce(ctx, stream, result.GetIds())

simpleDelete也是调用这个函数。

complexDelete会根据expr查询出主键,然后根据主键进行删除数据。

7.总结

  • delete api根据expr走simpleDelete还是complexDelete。
  • complexDelete最终也会转化为simpleDelete。

相关文章:

milvus Delete API流程源码分析

Delete API执行流程源码解析 milvus版本:v2.3.2 整体架构: Delete 的数据流向: 1.客户端sdk发出Delete API请求。 from pymilvus import (connections,Collection, )print("start connecting to Milvus") connections.connect("default", host"192…...

CentOS使用Docker搭建Halo网站并实现无公网ip远程访问

🔥博客主页: 小羊失眠啦. 🎥系列专栏:《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞👍收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默&…...

【JVM】垃圾回收算法

📝个人主页:五敷有你 🔥系列专栏:JVM ⛺️稳中求进,晒太阳 垃圾回收算法 Java是如何实现垃圾回收的呢?简单来说,垃圾回收就做两件事 找到内存中存活的对象释放不在存活对象的内存&…...

如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值

如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值 如何和将原始request的Header中的值传递给openfeign请求的Header参考 [https://www.jb51.net/article/282522.htm](https://www.jb51.net/article/28252…...

Flink 侧输出流(SideOutput)

🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。 🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也…...

C语言中关于#include的一些小知识

写代码的过程中,因为手误,重复包含了头文件 可以看到没有报错 如果是你自己编写的头文件,那么如果没加唯一包含标识的话,那么编译器会编译报错的。如果是系统自带的头文件,由于其每个头文件都加了特殊标识&#xff0c…...

DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】

还是以德州仪器的TMS320F28335芯片为 “栗子”,说说这事儿。 编制好的程序经过开发环境可以编译成扩展名为 .out 文件,这个文件就是DSP可以运行机器码,我们把这个文件下载到 DSP芯片中的程序区, 下载好了,这个芯片原…...

速盾网络:CDN用几天关了可以吗?安全吗?

在使用CDN(内容分发网络)时,有时候会考虑暂时关闭或暂停使用CDN服务的情况。但是,对于关闭CDN服务的时间长短以及安全性问题,很多人可能存在疑问。在本文中,我们将讨论CDN使用中关闭几天是否安全以及相关注…...

MR混合现实情景实训教学系统在高空作业课堂中的应用

高空作业是一项高风险的工作,对于从业者来说,不仅需要具备专业的技能,还需要有丰富的实践经验。然而,传统的课堂教学往往只能通过理论讲解和模拟训练来传授知识,无法提供真实的实践环境。而MR混合现实情景实训教学系统…...

Windows系统中定时执行python脚本

背景:本地Windows系统指定目录下会有文件的修改新增,这些变化的文件需要定时的被上传到git仓库中,这样不需要每次变更手动上传了。 首先编写一个检测文件夹下文件变化并且上传git仓库的python脚本(确保你已经在E:\edc_workspace\data_edc_et…...

HashMap 源码学习-jdk1.8

1、一些常量的定义 这里针对MIN_TREEIFY_CAPACITY 这个值进行解释一下。 java8里面,HashMap 的数据结构是数组 (链表或者红黑树),每个数组节点下可能会存在链表和红黑树之间的转换,当同一个索引下面的节点超过8个时…...

WebStorm 2023:让您更接近理想的开发环境 mac/win版

JetBrains WebStorm 2023激活版下载是一款强大而智能的Web开发工具,专为提高开发人员的生产力而设计。这款编辑器提供了许多先进的代码编辑功能,以及一系列实用的工具和插件,可帮助您更快地编写、调试和测试代码。 WebStorm 2023软件获取 We…...

java面试题:数字与字母的映射表

前言 好记性不如烂笔头。 问题: 现在有一个数字与字母的映射表,且有以下规则: 映射表: 数字 字母 3 A 7 B 9 C 规则: 1.碰到当前数字时,使用字母替换,例如,3-> A 2.碰到当前数…...

Jmeter教程-JMeter 环境安装及配置

Jmeter教程 JMeter 环境安装及配置 在使用 JMeter 之前,需要配置相应的环境,包括安装 JDK 和获取 JMeter ZIP 包。 安装JDK 1.JDK下载 示例环境为Windows11环境,读者应根据实际环境下载JDK的安装包。 JDK下载地址: Java21 下载 …...

十大基础排序算法

排序算法分类 排序:将一组对象按照某种逻辑顺序重新排列的过程。 按照待排序数据的规模分为: 内部排序:数据量不大,全部存在内存中;外部排序:数据量很大,无法一次性全部存在内存中,…...

IP协议及相关技术协议

一、IP基本认识 1. IP的作用 IP在TCP/IP模型中处于网络层,网络层的主要作用是实现主机与主机之间的通信,而IP的作用是在复杂的网络环境中将数据包发送给最终目的主机。 2. IP与MAC的关系 简单而言,MAC的作用是实现“直连”的两个设备之通信…...

小红书x-s算法及补环境 单旋转验证码

前言 大家好呀!新的一年,先祝大家新年快乐咯.祝大家逆向,风控都一把过咯. 新年第一篇文章,后续会持续更新哦! 春晚见证了中国经济的新风口,今年春晚互联网企业赞助商就两家,小红书和京东.小红书类似国外的ins,有预感未来小红书会大火,所以写了这篇文章,有需要的加我,联系方式…...

代码检测规范和git提交规范

摘要:之前开发的项目,代码检测和提交规范都是已经配置好的,最近自己新建的项目就记录下相关配置过程。 1. ESlint配置 2013年6月创建开源项目,提供一个插件化的JavaScript代码检测工具,创建项目是生成的eslintrc.js文…...

Elasticsearch:什么是搜索引擎?

搜索引擎定义 搜索引擎是一种软件程序或系统,旨在帮助用户查找存储在互联网或特定数据库中的信息。 搜索引擎的工作原理是对各种来源的内容进行索引和编目,然后根据用户的搜索查询向用户提供相关结果列表。 搜索引擎对于希望快速有效地查找特定信息的用…...

人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora

近30年,人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora 深蓝: 1997年,深蓝击败卡斯帕罗夫的比赛是通过一系列复杂的算法和策略实现的。深蓝的开发团队使用了一种名为“暴力搜索”的技术&…...

WordPres Bricks Builder 前台RCE漏洞复现(CVE-2024-25600)

0x01 产品简介 Bricks Builder是一款用于WordPress的开发主题,提供直观的拖放界面,用于设计和构建WordPress网站。它使用户能够轻松创建自定义的网页布局和设计,无需编写或了解复杂的代码。Bricks Builder具有用户友好的界面和强大的功能,使用户可以通过简单的拖放操作添加…...

代码随想录算法训练营总结 | 慢慢总结,想起啥就先写上

二叉树总结 二叉树的结构 stauct TreeNode {int val;TreeNode* left;TreeNode* right; }二叉树的递归函数分析 二叉树的递归函数当做只有一个根节点,一个左子树,一个右节点的数去看,这看着是个废话, 其实很重要 回溯…...

基于开源模型对文本和音频进行情感分析

应用场景 从商品详情页爬取商品评论,对其做舆情分析;电话客服,对音频进行分析,做舆情分析; 通过开发相应的服务接口,进一步工程化; 模型选用 文本,选用了通义实验室fine-tune的st…...

SQL中为什么不要使用1=1

最近看几个老项目的SQL条件中使用了11,想想自己也曾经这样写过,略有感触,特别拿出来说道说道。 编写SQL语句就像炒菜,每一种调料的使用都可能会影响菜品的最终味道,每一个SQL条件的加入也可能会影响查询的执行效率。那…...

python 几种常见的音频数据读取、保存方式

1. soundfile 库的使用 soundfile库是一个Python库,主要用于读取和写入音频文件。它支持多种音频格式,包括WAV、AIFF、FLAC和OGG等。通过soundfile库,用户可以方便地将numpy数组存储到音频文件或者将音频文件加载到numpy数组中。此外&#x…...

关于msvcr120.dll丢失怎样修复的详细解决步骤方法分享,msvcr120.dll文件的相关内容

在电脑使用过程中,我们经常遇到各种系统错误,其中msvcr120.dll丢失是一个常见问题。msvcr120.dll文件是Visual C Redistributable for Visual Studio 2015/2017的一个组件,主要用于支持某些应用程序的正常运行。当电脑出现msvcr120.dll丢失情…...

简单几步通过DD工具把云服务器系统Linux改为windows

简单几部通过DD安装其他系统,当服务器的web控制台没有我们要装的系统,就需要通过DD(Linux磁盘)工具来更改系统,(已知支持KVM系统) 本文如何简单的更换系统,不通过web控制台来更换&a…...

使用 package.json 配置代理解决 React 项目中的跨域请求问题

使用 package.json 配置代理解决 React 项目中的跨域请求问题 当我们在开发前端应用时,经常会遇到跨域请求的问题。为了解决这个问题,我们可以通过配置代理来实现在开发环境中向后端服务器发送请求。 在 React 项目中,我们可以使用 package…...

生成 Let‘s Encrypt 免费证书

文章目录 1. 安装 acme.sh2. 添加云服务商安全访问密钥并授权管理DNS记录3. 当前 Shell 添加安全访问密钥变量4. 生成证书5. 拷贝证书6. 清理安全访问密钥变量7. 打开脚本自动更新 代码仓库地址:https://github.com/Neilpang/acme.sh 1. 安装 acme.sh yum -y insta…...

int128的实现(基本完成)

虽然有一个声明叫_int128但是这并不是C标准: long long 不够用?详解 __int128 - FReQuenter - 博客园 (cnblogs.com) 网络上去找int128的另类实现方法,发现几乎都是在介绍_int128的 然后我就自己想了个办法,当时还没学C&#xf…...