当前位置: 首页 > news >正文

管理流创建schema流程源码解析

一、简析

schema是pulsar重要的功能之一,现在就一起从源码的视角看下管理流创建schema时客户端和服务端的表现
在这里插入图片描述

客户端

客户端主要经历以下四个步骤

  1. 创建Schema实例

    根据数据类型创建相对应的实例,例如Avro创建AvroSchema、JSON创建JSONSchema等

  2. 获取处理Schema的对象

    管理流PulsarAdmin对象获取SchemasImpl对象,这个对象是专门处理所有schema相关的操作。除此之外PulsarAdmin对象还维护着Clusters、Brokers、Tenants等等管理维护集群的重要对象,通过这些对象可以很好的管理维护Pulsar集群

  3. 构造SchemaInfo

    通过Schema实例创建其对应的SchemaInfo信息,里面就包括这个schema的名字、schema的结构化信息、schema类型等等,最后SchemaInfo这个对象会转成字符串发到服务端

  4. 发送HTTP请求

    通过post请求将数据发到服务端,这里是通过Java Rest库javax.ws.rs-api进行处理的

服务端

服务端主要经历以下四个步骤

  1. 参数格式校验

    • 校验租户、命名空间的是否有效(判空、是否有特殊字符)
    • 从缓存中根据Topic获取其对应的TopicName对象
  2. 权限校验

    • 判断是否是Topic的owner
    • 判断当前用户是否有操作当前Topic的权限
  3. SchemaRegistry注册schema

    SchemaRegistryService是服务端处理所有schema相关的对象,而schema相关的读写操作是依赖它的成员SchemaStorage进行处理的,SchemaStorage的最终是通过Bookkeeper客户端对象发送写请求

  4. 写Bookkeeper

    通过LedgerHandle对象向Bookkeeper服务端发送写请求

小结

schame新建流程概括起来就是,客户端构造schema信息,服务端负责schema校验,bookkeeper负责schema的存储

二、客户端源码解析

源码跟踪

下面是通过管理流创建schema的样例代码,核心就是通过PulsarAdmin.schemas获取schema对象,这个schema对象负责所有客户端跟schema相关的操作,包括schema的增删改查等。通过方法的第二个参数可以看到是通过Schema接口提供的静态方法AVRO来构造Avro格式的schema对象,除此之外Schema接口还提供了诸如JSON、KeyValue、PROTOBUF等静态方法提供对应数据格式的schema对象,这里如果将这块构造schema对象逻辑抽成简单工厂模式可能会更合适些
在这里插入图片描述

接下来就进入createSchema方法,顾名思义可以知道这个方法就是用于创建schema的,第一个参数是topic,第二个参数是SchemaInfo对象,这个对象包含了所有要新建的schema信息,这里会将它转换为PostSchemaPayload对象传递给下一个方法。PostSchemaPayload是用来请求到服务端的参数
在这里插入图片描述

这个方法并不会有返回值,sync方法是处理异步结果对象,它在正常写成功情况下不会做任何操作,但如果有什么错误会往外抛出异常。这里核心逻辑是在createSchemaAsync方法
在这里插入图片描述

可以看到这个方法的返回值是个异步对象,146行这里会获取当前topic对应的TopicName对象,并通过schemaPath方法构造WebTarget对象,这个对象中就包含着要请求的HTTP地址,主要是根据当前Topic的版本来决定请求服务端哪个版本的处理方法。除此之外还可以看到有通过Entity.json方法将PostSchemaPayload对象转换为HTTP请求的参数对象,转换逻辑是javax.ws.rs-api这个网络库封装的,就不进行跟踪了
在这里插入图片描述

这里就是客户端最后发送的地方,request方法中还会发送前的安全相关检查,async方法基本上就说明本次HTTP请求是异步的,而post方法也能看得出,这是一个POST类型的HTTP请求,再往后就是将请求发送出去了
在这里插入图片描述

不知是否有人好奇参数WebTarget长什么样子,通过通过调试可以看到值为

/admin/v2/schemas/public/test-namespace-jytixthzgatgirem/test-multi-version-schema-one/schema

此值仅供学习参考,具体这个值的构造逻辑如下
在这里插入图片描述

小结

简单归纳如下

  • 通过Schema接口构造对应数据格式的schema对象,由此对象可得到schema相关的元信息SchemaInfo
  • 构造请求目标的HTTP地址
  • 通过javax.ws.rs-api提供的库发送异步HTTP请求到服务端

三、服务端源码解析

源码跟踪

服务端的接收逻辑在SchemasResource类,这个类在org.apache.pulsar.broker.admin包下,这个包下全是处理管理流相关的操作,如果有做pulsar平台化需求的,这个包下的相关逻辑值得一读。

再来看看postSchema方法,首先是validateTopicName方法,这个方法就是对入参进行判空、是否有特殊字符做检查;接下来就是核心方法postSchemaAsync,通过方法名可以推断出这是个异步处理schema写请求的方法
在这里插入图片描述

postSchemaAsync方法看似复杂,实际上核心的就是133行,其余的方法大概说一下,validateOwnershipAndOperationAsync方法主要检查当前用户是否有新建schema的操作权限,getSchemaCompatibilityStrategyAsyncWithoutAuth方法相对复杂一些,放到后面详细讲解。那么再看回133行,其中getSchemaRegistryService方法获取的是SchemaRegistryServiceImpl对象,顾名思义可以知道Pulsar的SchemaResistry相关的功能都是由它进行处理,现在先看它的putSchemaIfAbsent方法
在这里插入图片描述

SchemaStorage对象是SchemaRegistryServiceImpl的核心成员,负责schema存储相关的操作。在新建schema时会调用它的put方法进行创建;这里有个trimDeletedSchemaAndGetList方法,如果put方法在创建schema时有任何异常,则此方法会去删除该新建的schema,避免写"一半"的情况发生,某种意义上这也是一种回滚的设计。
在这里插入图片描述

这里的getAll方法很重要,会根据schema的id来查询是否已经存在当前schema,有的话则将版本号加1。处理完之后就调用put方法
在这里插入图片描述

这里没什么逻辑,继续往下跟踪
在这里插入图片描述

getSchemaLocator方法会构造LocatorEntry对象,调用putSchema
在这里插入图片描述

由于是初次创建schema,因此直接走到337行;如果这个topic已经创建过schema则会读取之前的schema信息再新增,同时把版本号自增

在这里插入图片描述

在这里可以看得到构造IndexEntry对象,这是消息的索引对象,后续用来加速查询schema
在这里插入图片描述

这个方法的内容就很眼熟了(bookkeeper相关内容),createLedger方法会先创建这个Ledger
在这里插入图片描述

在576行可以看到最终调用bookkeeper创建这个Ledger
在这里插入图片描述

再来看看addEntry方法,这里核心也是调用bookkeeper的ledgerHandle进行数据写入
在这里插入图片描述

这个方法是属于Bookkeeper客户端的逻辑了,通过方法注释可以看到,这个方法负责将数据异步写入到一个打开的Ledger。Bookkeeper相关的逻辑后续在单独写post进行讲解
在这里插入图片描述

小结

简单归纳如下

  1. 参数格式校验、操作权限校验
  2. 查询当前Topic是否已经创建过schema,有则以插入时版本号自增
  3. 如果是初次创建Schema,则调用bookkeeper创建Ledger
  4. 往这个Schema对应的Ledger内插入schema元数据信息

四、其他

序列化对象创建流程

现在再专门来看看序列化对象的创建过程,回到开头管理流创建schema的地方,Schema.AVRO方法是咱们本次要看的
在这里插入图片描述

通过注释可以看到,此静态方法是创建一个Avro类型的schema对象,getDefaultImplementation方法是获取实现类(饿汉单例设计模式),而newAvroSchema方法才是本次要看的
在这里插入图片描述

继续往下跟踪
在这里插入图片描述

获取对应处理的类加载器,并通过对应的类加载器创建AvroSchema实例
在这里插入图片描述

54行是核心,其他的都是赋值操作
在这里插入图片描述

super调用父类构造函数做赋值操作,还是继续看
在这里插入图片描述

继续跟踪parse逻辑
在这里插入图片描述

FACTORY.createParser方法是jackson的方法,用于创建JsonParser对象的;因此继续跟踪parse方法
在这里插入图片描述

1471行可以看到返回了我们想要的Schema对象,那么Schema.parse方法就是重中之重
在这里插入图片描述

这个方法是核心,本身会递归的进行解析赋值给schema对象
在这里插入图片描述

相信读者读到这里也好奇schema长什么样,因此提供下图让读者感受下,能大概推测得出来这里已经涵盖了schema的结构信息了
在这里插入图片描述

getSchemaCompatibilityStrategyAsyncWithoutAuth方法

AdminResource#getSchemaCompatibilityStrategyAsyncWithoutAuth方法是在服务端处理schema创建请求阶段会调用的方法,现在就一起跟踪看看

731行和739行分别是获取Topic级别和Namespace级别的schema兼容策略,如果没有定义则默认自动更新。例如Topic A之前已经创建过schema1,那么如果此时再发起schema2创建请求,则服务端会继续保存并且生效schema2,只不过它的版本号会进行累加,当然,也可以配置为不支持schema策略不支持更新,一旦确定了后就不允许再变更
在这里插入图片描述

五、总结

相信大家对schema创建的流程已经很清楚了,再次简单归纳下

  1. 客户端根据用户定义的结构信息创建对应的Schema对象,并将结构信息以HTTP请求发给服务端
  2. 服务端检测并根据Schema兼容策略做相对应的处理,一般情况下会调用Bookkeeper创建Ledger以及Entry
  3. Bookkeeper将此Schema数据持久化到磁盘,相当于Schema信息会被Bookkeeper当作一条消息进行存储

这基本上就是全部内容,当然细节感兴趣的小伙伴可以自行跟踪代码,相信你会有更多收获~

相关文章:

管理流创建schema流程源码解析

一、简析 schema是pulsar重要的功能之一,现在就一起从源码的视角看下管理流创建schema时客户端和服务端的表现 客户端 客户端主要经历以下四个步骤 创建Schema实例 根据数据类型创建相对应的实例,例如Avro创建AvroSchema、JSON创建JSONSchema等 获取…...

【iOS】iOS内存五大分区

iOS内存五大分区 总揽 iOS中,内存主要分为五大区域:栈区,堆区,全局区/静态区,常量区和代码区。总览图如下。 这个图我觉得更好记,因为下面是低地址,上面是高地址,是比较符合日常…...

【项目实战】—— 高并发内存池

文章目录 什么是高并发内存池?项目介绍一、项目背景二、项目目标三、核心组件四、关键技术五、应用场景六、项目优势 什么是高并发内存池? 高并发内存池是一种专门设计用于高并发环境下的内存管理机制。它的原型是Google的一个开源项目tcmalloc&#xff…...

二叉搜索树的第 k 大的节点

题目描述 给定一棵二叉搜索树,请找出其中第 k 大的节点。 解题基本知识 二叉搜索树(Binary Search Tree)又名二叉查找树、二叉排序树。它是一棵空树,或者是具有下列性质的二叉树: 若它的左子树不空,则左子…...

利用langchain 做大模型 Few-shot Learning 提示,包括固定和向量相似的动态样本筛选

文章目录 few-shotFixed Examples 固定样本Dynamic few-shot prompting 动态样本提示辅助参考资料 few-shot 相比大模型微调,在有些情况下,我们更想使用 Few-shot Learning 通过给模型喂相关样本示例,让模型能够提升相应任务的能力。 固定样…...

基于python的百度迁徙迁入、迁出数据分析(五)

终于在第五篇文章我们进入了这个系列的正题:数据分析 这里我选择上海2024年5月1日——5月5日的迁入、迁出数据作为分析的基础,首先选择节假日的数据作为分析的原因呢,主要是节假日人们出行目的比较单一(出游、探亲)&a…...

SpringBoot 如何处理跨域请求

SpringBoot 处理跨域请求,通常是通过配置全局的 CORS(跨源资源共享)策略来实现的。CORS 是一种机制,它使用额外的 HTTP 头部来告诉浏览器,让运行在一个 origin (domain) 上的 web 应用被准许访问来自不同源服务器上的指…...

大数据技术基础编程、实验和案例----大数据课程综合实验案例

一、实验目的 (1)熟悉Linux系统、MySQL、Hadoop、HBase、Hive、Sqoop、R、Eclipse等系统和软件的安装和使用; (2)了解大数据处理的基本流程; (3)熟悉数据预处理方法; (4)熟悉在不同类型数据库之…...

微信小程序-获取手机号:HttpClientErrorException: 412 Precondition Failed: [no body]

问题: 412 异常就是你的请求参数获取请求头与服务器的不符,缺少请求体! 我的问题: 我这里获取微信手机号的时候突然给我报错142,但是代码用的是原来的代码,换了一个框架就噶了! 排查问题&am…...

大数据核心概念与技术架构简介

大数据基本概念 大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。 大数据特征: 数据量大:一般以P(1000个TB&a…...

快排 谁在中间

原题 Whos in the Middle FJ is surveying his herd to find the most average cow. He wants to know how much milk this median cow gives: half of the cows give as much or more than the median; half give as much or less. FJ正在调查他的牛群,以找到最…...

ORA-00911: invalid character

场景: 调用接口查询oracle的数据库数据时报错ORA-00911: invalid character,但是sql语句没有问题放在navicat控制台中运行也没有问题,但是代码中跑就会报无效字符集 分析: 代码中Oracle的语法解析器比较严格,比如句…...

Pytorch实现线性回归Linear Regression

借助 PyTorch 实现深度神经网络 - 线性回归 - 第 2 周 | Coursera 线性回归预测 用PyTorch实现线性回归模块 创建自定义模块(内含一个线性回归) 训练线性回归模型 对于线性回归,特定类型的噪声是高斯噪声 平均损失均方误差函数&#xff1a…...

十八次(虚拟主机与vue项目、samba磁盘映射、nfs共享)

1、虚拟主机搭建环境准备 将原有的nginx.conf文件备份 [rootserver ~]# cp /usr/local/nginx/conf/nginx.conf /usr/local/nginx/conf/nginx.conf.bak[rootserver ~]# grep -Ev "#|^$" /usr/local/nginx/conf/nginx.conf[rootserver ~]# grep -Ev "#|^$"…...

P1340 兽径管理 题解|最小生成树

题目大意 洛谷中链接 推荐文章:并查集入门 原文 约翰农场的牛群希望能够在 N N N 个草地之间任意移动。草地的编号由 1 1 1 到 N N N。草地之间有树林隔开。牛群希望能够选择草地间的路径,使牛群能够从任一 片草地移动到任一片其它草地。 牛群可在…...

Python,Maskrcnn训练,cannot import name ‘saving‘ from ‘keras.engine‘ ,等问题集合

Python版本3.9&#xff0c;tensorflow2.11.0&#xff0c;keras2.11.0 问题一、module keras.engine has no attribute Layer Traceback (most recent call last):File "C:\Users\Administrator\Desktop\20240801\代码\test.py", line 16, in <module>from mrc…...

Linux常用工具

文章目录 tar打包命令详解unzip命令&#xff1a;解压zip文件vim操作详解netstat详解df命令详解ps命令详解find命令详解 tar打包命令详解 tar命令做打包操作 当 tar 命令用于打包操作时&#xff0c;该命令的基本格式为&#xff1a; tar [选项] 源文件或目录此命令常用的选项及…...

AI未来的发展如何

AI&#xff08;人工智能&#xff09;的发展前景非常广阔&#xff0c;随着技术的不断进步和应用场景的不断拓展&#xff0c;AI将在多个领域发挥重要作用。以下是对AI发展前景的详细分析&#xff1a; 一、技术突破与创新 生成式AI的兴起&#xff1a;以ChatGPT为代表的生成式AI技…...

若依替换首页上的logo

...

sed的使用示例

场景:使用sed将多个空格变成单空格,再使用cut来切分得到需要的结果 得到后面这个文件名: ls ./ drwxr-x— 2 root root 6 Jul 18 9:00 7b40f1412d83c1524af7977593607f15 drwxr-x— 2 root root 6 Jul 18 14:00 50af29cef2c65a9d28905a3ce831bcb7 drwxr-x— 2 root root 6 Jul…...

学历不是障碍:大专生如何成功进入软件测试行业

摘要&#xff1a; 在当今技术驱动的职场环境中&#xff0c;软件测试已成为一个关键的职业领域。尽管许多人认为高学历是进入这一行业的先决条件&#xff0c;但实际上&#xff0c;大专学历的学生同样有机会在软件测试领域取得成功。本文将探讨大专生如何通过技能提升、实践经验和…...

文件解析漏洞—IIS解析漏洞—IIS6.X

目录 方式 1&#xff1a;目录解析 方式 2&#xff1a;畸形文件解析 方式 3&#xff1a;PUT 上传漏洞&#xff08;123.asp;.jpg 解析成 asp&#xff09; 环境&#xff1a;Windows server 2003 添加 IIS 管理工具——打开 IIS——添加网站 创建完成之后&#xff0c;右击创建的…...

Sqlmap中文使用手册 - Brute force模块参数使用

目录 1. Brute force模块的帮助文档2. 各个参数的介绍2.1 --common-tables2.2 --common-columns2.3 --common-files 1. Brute force模块的帮助文档 Brute force:These options can be used to run brute force checks--common-tables Check existence of common tables--c…...

ubuntu20.04 开源鸿蒙源码编译配置

替换华为源 sudo sed -i "shttp://.*archive.ubuntu.comhttp://repo.huaweicloud.comg" /etc/apt/sources.list && sudo sed -i "shttp://.*security.ubuntu.comhttp://repo.huaweicloud.comg" /etc/apt/sources.list 安装依赖工具 如果是ubun…...

程序员面试 “八股文”在实际工作中是助力、阻力还是空谈?

“八股文”在实际工作中是助力、阻力还是空谈&#xff1f; 作为现在各类大中小企业面试程序员时的必问内容&#xff0c;“八股文”似乎是很重要的存在。但“八股文”是否能在实际工作中发挥它“敲门砖”应有的作用呢&#xff1f;有IT人士不禁发出疑问&#xff1a;程序员面试考…...

广告从用户点击开始到最终扣费的过程

用户点击广告 用户在网页或移动应用上看到广告&#xff0c;并点击广告。这一事件触发了整个广告处理流程。 广告请求触发 用户点击广告后&#xff0c;客户端&#xff08;如浏览器、APP&#xff09;向广告系统发送广告点击请求。请求通常包含以下信息&#xff1a; 用户ID 设备信…...

Linux系统编程-信号进程间通信

目录 异步&#xff08;Asynchronous&#xff09; 信号 数据结构 1.kill 2.alarm 3.pause 4.setitimer 5.abort 信号集(sigset_t类型) 1.sigemptyset 2.sigfillset 3.sigaddset 4.sigdelset 5.sigismember 信号屏蔽 1.sigprocmask 2.sigpending 3.sigsus…...

Attention Module (SAM)是什么?

SAM&#xff08;Spatial Attention Module&#xff0c;空间注意力模块&#xff09;是一种在神经网络中应用的注意力机制&#xff0c;特别是在处理图像数据时&#xff0c;它能够帮助模型更好地关注输入数据中不同空间位置的重要性。以下是关于SAM的详细解释&#xff1a; 1. 基本…...

【C语言】堆排序

堆排序即利用堆的思想来进行排序&#xff0c;总共分为两个步骤&#xff1a; 1. 建堆 升序&#xff1a;建大堆 降序&#xff1a;建小堆 原因分析&#xff1a; 若升序建小堆时间复杂度是O(N^2) 升序建大堆&#xff0c;时间复杂度O&#xff08;N*logN&#xff09; 所以升序建大堆…...

ntp服务重启报错Failed to restart ntpd.service: Unit is masked.

问题概述&#xff1a; 重启ntp服务报错Failed to restart ntpd.service: Unit is masked&#xff0c;使用systemctl unmask ntpd.service命令关闭屏蔽还是报错Failed to restart ntpd.service: Unit is masked 解决方法&#xff1a; 重装ntp服务 yum remove ntpyum install…...

佛山专业网站营销/阿里大数据分析平台

领导力的36个关键 下载没有交易权 与生产线经理不同&#xff0c;您通常不以产品负责人的身份管理开发团队和利益相关者 &#xff0c;个人也不向您报告。 因此&#xff0c;您没有任何交易权 &#xff1a;您无法告诉别人该怎么做&#xff1b; 您不能为其分配任务&#xff1b; 并且…...

公司变更股东要交税吗/北京seo顾问服务

在上周的 Vue.js 伦敦大会上&#xff0c;Vue.js 作者尤雨溪简要介绍了 Vue 下一个主要版本要发布的内容&#xff0c;9 月 30 日&#xff0c;尤雨溪在 medium 个人博客上发布了 Vue 3.0 的开发路线&#xff0c;我们不妨看看 Vue 3.0 将会有怎样的发展。 兼容 按照尤雨溪的说法…...

最火的服务器托管/名词解释seo

2.搭建双主双从 编号 角色 Ip地址 机器名 1 Master1 192.168.119.131 Hadoop2 2 Slave1 192.168.119.132 Hadoop3 3 Master2 192.168.119.133 Hadoop1 4 Slave2 192.168.119.134 Hadoop4 2.1修改配置文件 修改四台服务器的/etc/my.cnf文件 ①Master1 [mysqld] server-id1 #…...

毕业设计代做网站推荐/杭州seo软件

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼package testOfProject;import javax.swing.*;import java.awt.*;import java.awt.event.*;public class ThreadView extends JFrame implements ActionListener {JPanel jp1;JButton jb1, jb2;public static void main(String[] a…...

上海网站建站/学seo的培训学校

迭代协议 可迭代协议&#xff08;The iterable protocol&#xff09; 和 迭代器协议&#xff08;The iterator protocol&#xff09;是对 ECMAScript 2015 的补充&#xff0c;不是新的内置或语法&#xff0c;仅仅是协议。可以被任何遵循某些约定的对象来实现。 可迭代协议 可迭…...

南宁手机建站公司/淘宝关键词优化技巧教程

点击上方蓝字关注我们1前言曾几何时&#xff0c;”云”还是指天上飘的那一朵朵白色的雾团&#xff0c;现在互联网上家家都说自己是”xx云”。“云”这个词&#xff0c;已经被赋上了新的含义。其实真正在做”云”的企业没几家。这篇文章会告诉大家&#xff0c;究竟什么是”云”&…...