导入JDBC元数据到Apache Atlas
前言
前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美
本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas
数据库schema与catalog
按照SQL
标准的解释,在SQL
环境下Catalog
和Schema
都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog
,每个Catalog
又包含多个Schema
,而每个Schema
又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema
,而该Schema
又必然属于一个Catalog
,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog
名称.Schema
名称.表名称。这里还有一点需要注意的是,SQL
标准并不要求每个数据库对象的完全限定名称是唯一的。
从实现的角度来看,各种数据库系统对Catalog
和Schema
的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog
名,使用用户名作为Schema
名,具体可参见下表:
表1 常用数据库
供应商 | Catalog支持 | Schema支持 |
---|---|---|
Oracle | 不支持 | Oracle User ID |
MySQL | 不支持 | 数据库名 |
MS SQL Server | 数据库名 | 对象属主名,2005版开始有变 |
DB2 | 指定数据库对象时,Catalog部分省略 | Catalog属主名 |
Sybase | 数据库名 | 数据库属主名 |
Informix | 不支持 | 不需要 |
PointBase | 不支持 | 数据库名 |
原文:https://www.cnblogs.com/ECNB/p/4611309.html
元数据模型层级抽象
不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系
- Datasource -> Catalog -> Schema -> Table -> Column
- Datasource -> Catalog -> Table -> Column
- Datasource -> Schema -> Table -> Column
元数据转换设计
提供元数据
借鉴Apache DolphinScheduler中获取Connection
的方式,不多赘述。
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}
转换元数据
- 元数据模型
创建数据库的元数据模型
private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}
创建数据库模式的元数据模型
private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}
创建数据库表的元数据模型
private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}
创建数据库列的元数据模型
private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}
创建实体之间的关系模型
private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
-
提取元数据
不再赘述
-
转换元数据
使用工厂模式,提供不同类型的元数据转换方式
public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}
List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysql
的information_schema
public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}
举例:JdbcMysqlTransfer 和 MysqlTransferFactory
@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
- 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}
效果展示
- 元数据类型定义
- 测试导入元数据
由于mysql没有采用schema,因此jdbc_schema为空
如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容
数据表元数据,qualifiedName使用数据库连接url
.表名
如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息
相关文章:

导入JDBC元数据到Apache Atlas
前言 前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美 本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas 数据库schema与catalog 按照SQL标准的解释,…...

大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现
大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现 技术栈:大数据爬虫/机器学习学习算法/数据分析与挖掘/大数据可视化/Django框架/Mysql数据库 本项目基于 Django框架开发的房屋可视化分析推荐系统。这个系统结合了大数据爬虫、机器学…...

[网鼎杯 2020 朱雀组]phpweb1
提示 call_user_func()函数先通过php内置函数来进行代码审计绕过system(##不止一种方法) 拿到题目养成一个好的习惯先抓个包 从抓到的包以及它首页的报错来看,这里死活会post传输两个参数func以及p func传输函数,而p则是传输参数的…...

深度学习之注意力机制
注意力机制与外部记忆 注意力机制与记忆增强网络是相辅相成的,神经网络去从内存中或者外部记忆中选出与当前输入相关的内容时需要注意力机制,而在注意力机制的很多应用场景中,我们的外部信息也可以看作是一个外部的记忆 这是一个阅读理解任务…...

WordPress:解决xmlrpc.php被扫描爆破的风险
使用WordPress的朋友都知道,一些【垃圾渣渣】会利用xmlrpc.php文件来进行攻击,绕过WP后台错误登录次数限制进行爆破。虽然密码复杂的极难爆破,但及其占用服务器资源。 方法一、利用宝塔防火墙(收费版) 一般可以直接使…...

Fiddler抓包模拟器(雷电模拟器)
Fiddler设置 List item 打开fiddler,的options 点击OK,重启fiddler 模拟器 更改网络设置 IP可以在电脑上终端上查看 然后在模拟器浏览器中输入IP:端口 安装证书...
RepidJson将内容写入文件
使用 RapidJSON 将内容写入文件的步骤如下: 创建一个 rapidjson::Document 对象,将需要写入文件的内容存储到其中。创建一个 rapidjson::StringBuffer 对象来保存 JSON 字符串。将 rapidjson::Document 对象转换为 JSON 字符串,并将其放入 r…...

Endnote使用教程
原由 最近要进行开题报告,要求不低于60文献的阅读与引用,单独插入引入我觉得是非常繁琐的事情,所以就借助Endnote这个工具,减少我们的工作量。 使用方法 第一步:先新建一个数据库,这样子可以在这个数据库…...
java中用Thead创建线程和用Runnable创建线程的区别是什么?
在 Java 中,创建线程的两种主要方式是通过继承 Thread 类和通过实现 Runnable 接口。下面是它们之间的主要区别: 1. 继承 Thread 类: class MyThread extends Thread {public void run() {// 线程执行的代码} }// 创建并启动线程 MyThread …...

0013Java程序设计-基于Vue的上课签到系统的设计与实现
文章目录 **摘 要**目录系统设计4.2学生签到4.3 签到信息列表4.4 用户信息管理5.1系统登录5.1.1 登录5.1.2 清除用户登记记录5.1.3 登录拦截 5.2用户管理5.2.2 用户添加5.2.3 用户编辑5.2.4 用户删除5.2.5 用户分页 5.3签到信息5.3.1签到信息列表 5.4学生签到5.4.1学生签到 开发…...

2.修改列名与列的数据类型
修改字段名与字段数据类型 1.修改字段名 有时,在我们建好一张表后会突然发现,哎呀!字段名貌似写错了!怎么办?要删了表再重新建一个新表吗?还是要删了这个字段再新建一个新的字段? 都不用&…...

[Firefly-Linux] RK3568 Ubuntu固件分区详解
RK为了方便开发与产品定制,自己定义了一套固件的分区,这些分区信息存放在parameter.txt文件中,Firefly参考这个文件定义了自己的Ubuntu分区,文件为parameter-ubuntu.txt,存放于Linux_SDK的device/rockchip/rk356x目录下…...

SpringBoot项目访问resources下的静态资源
1.新建一个配置文件夹,放配置类 2.编辑 WebMvcConfig.java package com.southwind.configuration;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import or…...
Qt之面试经验
1.恒生芸擎网络 技术没怎么问,一面问对方工作日常会涉及的一些东西(自动发布),二面公司流程,三面其他(没发offer) 2.光珀智能科技 涉及AI算法落地,问了点基础问题,比如…...
数据库基础概念与范式反范式总结
文章目录 一、基本概念1、属性2、元组3、关系4、超键5、候选键6、主键7、主属性8、外键9、函数依赖完全依赖 二、数据库范式1、第一范式(1NF)2、第二范式(2NF)3、第三范式(3NF)4、巴斯-科德范式(…...
tanstack/react-query使用手册
1. useQuery useQuery的使用一、data是后端成功返回的数据, 第一次的值为undefined 二、isLoading是指数据是否正在加载的状态,通常用于判断请求是否还在进行中。当isLoading为true时,表示数据正在加载中,当isLoading为false时&a…...
camera2对摄像头编码h264
MediaCodec编码摄像头数据 前置:保存的一些成员变量 // 摄像头开启的 handler private Handler cameraHandler; // Camera session 会话 handler private Handler sessionHandler; //这里是个Context都行 private AppCompatActivity mActivity; // 这个摄像头所有需…...

Apache solr XXE 漏洞(CVE-2017-12629)
任务一: 复现环境中的漏洞 任务二: 利用XXE漏洞发送HTTP请求,在VPS服务器端接受请求,或收到DNS记录 任务三: 利用XXE漏洞读取本地的/etc/passwd文件 1.搭建环境 2.开始看wp的时候没有看懂为什么是core,然…...

HTML代码混淆技术:原理、应用和实现方法详解
HTML代码混淆是一种常用的反爬虫技术,它可以有效地防止爬虫对网站数据的抓取。本文将详细介绍HTML代码混淆技术的原理、应用以及实现方法,帮助大家更好地了解和运用这一技术。 一、HTML代码混淆的原理 HTML代码混淆是指将HTML源码通过特定的算法进行加…...
quickapp_快应用_系统接口应用
系统接口 在项目中使用到的接口都需要在配置文件manifest.json中声明,不然会报如下警告 [WARN] 请在 manifest.json 文件里声明项目代码中用到的接口: system.storage, service.account, system.package, system.webview[1]检查某app是否在手机上安装 官方文档&a…...

LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)
第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10pip3.10) 一:前言二:安装编译依赖二:安装Python3.10三:安装PIP3.10四:安装Paddlepaddle基础框架4.1…...

【Qt】控件 QWidget
控件 QWidget 一. 控件概述二. QWidget 的核心属性可用状态:enabled几何:geometrywindows frame 窗口框架的影响 窗口标题:windowTitle窗口图标:windowIconqrc 机制 窗口不透明度:windowOpacity光标:cursor…...

轻量安全的密码管理工具Vaultwarden
一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版,由国外开发者在Bitwarden的基础上,采用Rust语言重写而成。 (一)Vaultwarden镜像的作用及特点 轻量级与高性…...

华为云Flexus+DeepSeek征文 | MaaS平台避坑指南:DeepSeek商用服务开通与成本控制
作者简介 我是摘星,一名专注于云计算和AI技术的开发者。本次通过华为云MaaS平台体验DeepSeek系列模型,将实际使用经验分享给大家,希望能帮助开发者快速掌握华为云AI服务的核心能力。 目录 作者简介 前言 一、技术架构概览 1.1 整体架构设…...
Q1起重机指挥理论备考要点分析
Q1起重机指挥理论备考要点分析 一、考试重点内容概述 Q1起重机指挥理论考试主要包含三大核心模块:安全技术知识(占40%)、指挥信号规范(占30%)和法规标准(占30%)。考试采用百分制,8…...