导入JDBC元数据到Apache Atlas
前言
前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美
本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas
数据库schema与catalog
按照SQL
标准的解释,在SQL
环境下Catalog
和Schema
都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog
,每个Catalog
又包含多个Schema
,而每个Schema
又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema
,而该Schema
又必然属于一个Catalog
,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog
名称.Schema
名称.表名称。这里还有一点需要注意的是,SQL
标准并不要求每个数据库对象的完全限定名称是唯一的。
从实现的角度来看,各种数据库系统对Catalog
和Schema
的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog
名,使用用户名作为Schema
名,具体可参见下表:
表1 常用数据库
供应商 | Catalog支持 | Schema支持 |
---|---|---|
Oracle | 不支持 | Oracle User ID |
MySQL | 不支持 | 数据库名 |
MS SQL Server | 数据库名 | 对象属主名,2005版开始有变 |
DB2 | 指定数据库对象时,Catalog部分省略 | Catalog属主名 |
Sybase | 数据库名 | 数据库属主名 |
Informix | 不支持 | 不需要 |
PointBase | 不支持 | 数据库名 |
原文:https://www.cnblogs.com/ECNB/p/4611309.html
元数据模型层级抽象
不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系
- Datasource -> Catalog -> Schema -> Table -> Column
- Datasource -> Catalog -> Table -> Column
- Datasource -> Schema -> Table -> Column
元数据转换设计
提供元数据
借鉴Apache DolphinScheduler中获取Connection
的方式,不多赘述。
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}
转换元数据
- 元数据模型
创建数据库的元数据模型
private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}
创建数据库模式的元数据模型
private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}
创建数据库表的元数据模型
private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}
创建数据库列的元数据模型
private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}
创建实体之间的关系模型
private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
-
提取元数据
不再赘述
-
转换元数据
使用工厂模式,提供不同类型的元数据转换方式
public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}
List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysql
的information_schema
public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}
举例:JdbcMysqlTransfer 和 MysqlTransferFactory
@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
- 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}
效果展示
- 元数据类型定义
- 测试导入元数据
由于mysql没有采用schema,因此jdbc_schema为空
如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容
数据表元数据,qualifiedName使用数据库连接url
.表名
如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息
相关文章:

导入JDBC元数据到Apache Atlas
前言 前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美 本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas 数据库schema与catalog 按照SQL标准的解释,…...

大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现
大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现 技术栈:大数据爬虫/机器学习学习算法/数据分析与挖掘/大数据可视化/Django框架/Mysql数据库 本项目基于 Django框架开发的房屋可视化分析推荐系统。这个系统结合了大数据爬虫、机器学…...

[网鼎杯 2020 朱雀组]phpweb1
提示 call_user_func()函数先通过php内置函数来进行代码审计绕过system(##不止一种方法) 拿到题目养成一个好的习惯先抓个包 从抓到的包以及它首页的报错来看,这里死活会post传输两个参数func以及p func传输函数,而p则是传输参数的…...

深度学习之注意力机制
注意力机制与外部记忆 注意力机制与记忆增强网络是相辅相成的,神经网络去从内存中或者外部记忆中选出与当前输入相关的内容时需要注意力机制,而在注意力机制的很多应用场景中,我们的外部信息也可以看作是一个外部的记忆 这是一个阅读理解任务…...

WordPress:解决xmlrpc.php被扫描爆破的风险
使用WordPress的朋友都知道,一些【垃圾渣渣】会利用xmlrpc.php文件来进行攻击,绕过WP后台错误登录次数限制进行爆破。虽然密码复杂的极难爆破,但及其占用服务器资源。 方法一、利用宝塔防火墙(收费版) 一般可以直接使…...

Fiddler抓包模拟器(雷电模拟器)
Fiddler设置 List item 打开fiddler,的options 点击OK,重启fiddler 模拟器 更改网络设置 IP可以在电脑上终端上查看 然后在模拟器浏览器中输入IP:端口 安装证书...
RepidJson将内容写入文件
使用 RapidJSON 将内容写入文件的步骤如下: 创建一个 rapidjson::Document 对象,将需要写入文件的内容存储到其中。创建一个 rapidjson::StringBuffer 对象来保存 JSON 字符串。将 rapidjson::Document 对象转换为 JSON 字符串,并将其放入 r…...

Endnote使用教程
原由 最近要进行开题报告,要求不低于60文献的阅读与引用,单独插入引入我觉得是非常繁琐的事情,所以就借助Endnote这个工具,减少我们的工作量。 使用方法 第一步:先新建一个数据库,这样子可以在这个数据库…...
java中用Thead创建线程和用Runnable创建线程的区别是什么?
在 Java 中,创建线程的两种主要方式是通过继承 Thread 类和通过实现 Runnable 接口。下面是它们之间的主要区别: 1. 继承 Thread 类: class MyThread extends Thread {public void run() {// 线程执行的代码} }// 创建并启动线程 MyThread …...

0013Java程序设计-基于Vue的上课签到系统的设计与实现
文章目录 **摘 要**目录系统设计4.2学生签到4.3 签到信息列表4.4 用户信息管理5.1系统登录5.1.1 登录5.1.2 清除用户登记记录5.1.3 登录拦截 5.2用户管理5.2.2 用户添加5.2.3 用户编辑5.2.4 用户删除5.2.5 用户分页 5.3签到信息5.3.1签到信息列表 5.4学生签到5.4.1学生签到 开发…...

2.修改列名与列的数据类型
修改字段名与字段数据类型 1.修改字段名 有时,在我们建好一张表后会突然发现,哎呀!字段名貌似写错了!怎么办?要删了表再重新建一个新表吗?还是要删了这个字段再新建一个新的字段? 都不用&…...

[Firefly-Linux] RK3568 Ubuntu固件分区详解
RK为了方便开发与产品定制,自己定义了一套固件的分区,这些分区信息存放在parameter.txt文件中,Firefly参考这个文件定义了自己的Ubuntu分区,文件为parameter-ubuntu.txt,存放于Linux_SDK的device/rockchip/rk356x目录下…...

SpringBoot项目访问resources下的静态资源
1.新建一个配置文件夹,放配置类 2.编辑 WebMvcConfig.java package com.southwind.configuration;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import or…...
Qt之面试经验
1.恒生芸擎网络 技术没怎么问,一面问对方工作日常会涉及的一些东西(自动发布),二面公司流程,三面其他(没发offer) 2.光珀智能科技 涉及AI算法落地,问了点基础问题,比如…...
数据库基础概念与范式反范式总结
文章目录 一、基本概念1、属性2、元组3、关系4、超键5、候选键6、主键7、主属性8、外键9、函数依赖完全依赖 二、数据库范式1、第一范式(1NF)2、第二范式(2NF)3、第三范式(3NF)4、巴斯-科德范式(…...
tanstack/react-query使用手册
1. useQuery useQuery的使用一、data是后端成功返回的数据, 第一次的值为undefined 二、isLoading是指数据是否正在加载的状态,通常用于判断请求是否还在进行中。当isLoading为true时,表示数据正在加载中,当isLoading为false时&a…...
camera2对摄像头编码h264
MediaCodec编码摄像头数据 前置:保存的一些成员变量 // 摄像头开启的 handler private Handler cameraHandler; // Camera session 会话 handler private Handler sessionHandler; //这里是个Context都行 private AppCompatActivity mActivity; // 这个摄像头所有需…...

Apache solr XXE 漏洞(CVE-2017-12629)
任务一: 复现环境中的漏洞 任务二: 利用XXE漏洞发送HTTP请求,在VPS服务器端接受请求,或收到DNS记录 任务三: 利用XXE漏洞读取本地的/etc/passwd文件 1.搭建环境 2.开始看wp的时候没有看懂为什么是core,然…...

HTML代码混淆技术:原理、应用和实现方法详解
HTML代码混淆是一种常用的反爬虫技术,它可以有效地防止爬虫对网站数据的抓取。本文将详细介绍HTML代码混淆技术的原理、应用以及实现方法,帮助大家更好地了解和运用这一技术。 一、HTML代码混淆的原理 HTML代码混淆是指将HTML源码通过特定的算法进行加…...
quickapp_快应用_系统接口应用
系统接口 在项目中使用到的接口都需要在配置文件manifest.json中声明,不然会报如下警告 [WARN] 请在 manifest.json 文件里声明项目代码中用到的接口: system.storage, service.account, system.package, system.webview[1]检查某app是否在手机上安装 官方文档&a…...

IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...

基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...

Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...