Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase
续第一篇:Java代码将Mysql表数据导入HBase表
源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase
- 一、整体介绍
- 二、依赖
- 三、测试结果
- 四、源码
一、整体介绍
-
HBase
特质连接HBase, 创建HBase执行对象
- 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
Configuration conf = HBaseConfiguration.create()
conf.set(String, String) - 创建连接:多个连接(池化)
Connection con = ConnectionFactory.createConnection() - 创建数据表:表名: String
Table table = con.getTable(TableName)
def build(): HBase // 初始化配置信息 def initPool(): HBase // 初始化连接池 def finish(): Executor // 完成 返回执行对象
- 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
-
Executor
特质对HBase进行操作的方法: 包含如下函数
def exists(tableName: String): Boolean // 验证数据表是否存在 def create(tableName: String, columnFamilies: Seq[String]): Boolean // 创建数据表 def drop(tableName: String): Boolean // 删除数据表 def put(tableName: String, data: util.List[Put]): Boolean // 批量插入数据
-
Jdbc
封装Jdbc封装
- 初始化连接
driver : com.mysql.cj.jdbc.Driver
参数:url, username, password
创建连接 - 初始化执行器
sql, parameters
创建执行器【初始化参数】 - 执行操作并返回【结果】
DML: 返回影响数据库表行数
DQL: 返回查询的数据集合
EX: 出现异常结果
- 初始化连接
-
MyHBase
用于实现HBase
和Executor
特质 -
测试数据格式
mysql表
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `test_table_for_hbase`; CREATE TABLE `test_table_for_hbase` (`test_id` int NULL DEFAULT NULL,`test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_age` int NULL DEFAULT NULL,`test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112'); INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113'); INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114'); INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115'); -- .... 省略以下数据部分
hbase表
# 创建表 库名:表名, 列族1, 列族2 create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo" truncate 'hbase_test:tranfer_from_mysql' # 清空hbase_test命名空间下的tranfer_from_mysql表 scan 'hbase_test:tranfer_from_mysql' # 查看表
二、依赖
<dependencies><!-- HBase 驱动 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.1.3</version></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency>
</dependencies>
三、测试结果
终端有个日志的小警告(无伤大雅hh),输出为 true
查看hbase表,发现数据正常导入
四、源码
scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载
Executor
package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}
HBase
package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int = -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out = {available = falsethis}def in = available = true}def initPool(): HBasedef finish(): Executor
}
MyHBase
package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{private lazy val config: Configuration = HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()override def build(): HBase = {if(statusCode == -1){conf.foreach(t => config.set(t._1, t._2))statusCode = 0this}else{throw new HBaseException("build() function must be invoked first")}}override def initPool(): HBase = {if(statusCode == 0){val POOL_SIZE = if (pooled) {if (poolSize <= 0) 3 else poolSize} else 1for (i <- 1 to POOL_SIZE) {pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))}statusCode = 1this}else{throw new HBaseException("initPool() function must be invoked only after build()")}}override def finish(): Executor = {if (statusCode == 1) {statusCode = 2new Executor {override def exists(tableName: String): Boolean = {var pc: PoolCon = nulltry{pc = getConval exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception => e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {if (exists(tableName)) {return false}var pc: PoolCon = nulltry {pc = getConval builder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf => builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException("finish() function must be invoked only after initPool()")}}private def getCon = {val left: ArrayBuffer[PoolCon] = pool.filter(_.available)if (left.isEmpty) {throw new HBaseException("no available connection")}left.apply(0).out}private def close(con: PoolCon) = {if (null != con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}
Jdbc
package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX = Value(0) val DML = Value(1) val DQL = Value(2) }// 3种结果(异常,DML,DQL)封装case class ResThree(rst: Result.Value) {def to[T <: ResThree]: T = this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex = new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update = affectedRows}object Dml {def apply(affectedRows: Int): Dml = new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet => T) = {val list: util.List[T] = new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql = new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {def con() = {// 1.1 显式加载 JDBC 驱动程序(只需要一次)Class.forName("com.mysql.cj.jdbc.Driver")// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) = {// 2.1 创建执行对象val pst = con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null != params && params.nonEmpty) {params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))}pst}try {val connect = con()val prepared = pst(connect)sql match {case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")=> Dml(prepared.executeUpdate())case sql if sql.matches("^(select|SELECT) .*")=> Dql(prepared.executeQuery())case _ => Ex(new SQLException(s"illegal sql command : $sql"))}} catch {case e: Exception => Ex(e)}}}
Test
import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit = {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(user = "root",url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",password = "123456")// 执行SQL查询,并将结果封装在ResThree对象中val toEntity: ResThree = jdbcOpr("select * from test_table_for_hbase where test_id between ? and ?",Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst == Result.EX) {// 如果异常,执行异常结果处理toEntity.to[Ex]println("出现异常结果处理")} else {// 如果没有异常,将查询结果转换为HBase的Put对象列表val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {// 创建一个Put对象,表示HBase中的一行val put = new Put(Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),Bytes.toBytes(rst.getString("test_name")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型)put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),Bytes.toBytes(rst.getString("test_gender")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),Bytes.toBytes(rst.getString("test_phone")))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() > 0) {// 初始化HBase连接池并执行Put操作val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1).build().initPool().finish()// 执行Put操作,并返回是否成功val bool = exe.put("hbase_test:tranfer_from_mysql", puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println("查无数据")}}}
}
相关文章:

Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase 续第一篇:Java代码将Mysql表数据导入HBase表 源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码 一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化…...

前端工程化:基于Vue.js 3.0的设计与实践
这里写目录标题 《前端工程化:基于Vue.js 3.0的设计与实践》书籍引言本书概述主要内容作者简介为什么选择这本书?结语 《前端工程化:基于Vue.js 3.0的设计与实践》书籍 够买连接—>https://item.jd.com/13952512.html 引言 在前端技术日…...

Linux☞进程控制
在终端执行命令时,Linux会建立进程,程序执行完,进程会被终止;Linux是一个多任务的OS,允许多个进程并发运行; Linxu中启动进程的两种途径: ①手动启动(前台进程(命令gedit)...后台进程(命令‘&’)) ②…...
mybatis离谱bug乱转类型
字符串传入的参数被转成了int: Param("online") String online<if test"online 0">and (heart_time is null or heart_time <![CDATA[ < ]]> UNIX_TIMESTAMP(SUBDATE(now(),INTERVAL 8 MINUTE)) )</if><if test"…...

视频监控管理平台LntonCVS视频汇聚平台充电桩视频监控应用方案
随着新能源汽车的广泛使用,公众对充电设施的安全性和可靠性日益重视。为了提高充电桩的安全管理和站点运营效率,LntonCVS公司推出了一套全面的新能源汽车充电桩视频监控与管理解决方案。 该方案通过安装高分辨率摄像头,对充电桩及其周边区域进…...
VS环境Python:深度探索与实用指南
VS环境Python:深度探索与实用指南 在编程领域,VS环境(Visual Studio环境)与Python的结合,为开发者们提供了一种强大而灵活的开发体验。这种结合不仅提升了开发效率,还增强了代码的可读性和可维护性。然而&…...

SpringBoot整合SpringSecurit(二)通过token进行访问
在文章:SpringBoot整合SpringSecurit(一)实现ajax的登录、退出、权限校验-CSDN博客 里面,使用的session的方式进行保存用户信息的,这一篇文章就是使用token的方式。 在其上进行的改造,可以先看SpringBoot…...
【算法训练 day50 打家劫舍、打家劫舍Ⅱ、打家劫舍Ⅲ】
目录 一、打家劫舍-LeetCode 198思路 二、打家劫舍Ⅱ-LeetCode 213思路 三.打家劫舍Ⅲ-LeeCode 337思路 一、打家劫舍-LeetCode 198 Leecode链接: leetcode 198 思路 dp数组含义为:当前数组范围下能偷到的最多的钱。递推公式为:dp[j] max(dp[j-2]nums[j],dp[j-1…...

YOLOv8改进 | 卷积模块 | 在主干网络中添加/替换蛇形卷积Dynamic Snake Convolution
💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 蛇形动态卷积是一种新型的卷积操作,旨在提高对细长和弯曲的管状结构的特征提取能力。它通过自适应地调整卷积核的权重࿰…...
深入解析力扣183题:从不订购的客户(LEFT JOIN与子查询方法详解)
关注微信公众号 数据分析螺丝钉 免费领取价值万元的python/java/商业分析/数据结构与算法学习资料 在本篇文章中,我们将详细解读力扣第183题“从不订购的客户”。通过学习本篇文章,读者将掌握如何使用SQL语句来解决这一问题,并了解相关的复杂…...

牛客NC32 求平方根【简单 二分 Java/Go/C++】
题目 题目链接: https://www.nowcoder.com/practice/09fbfb16140b40499951f55113f2166c 思路 Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值即可*** para…...

王道408数据结构CH3_栈、队列
概述 3.栈、队列和数组 3.1 栈 3.1.1 基本操作 3.1.2 顺序栈 #define Maxsize 50typedef struct{ElemType data[Maxsize];int top; }SqStack;3.1.3 链式栈 typedef struct LinkNode{ElemType data;struct LinkNode *next; }*LiStack;3.2 队列 3.2.1 基本操作 3.2.2 顺序存储…...

Angular 由一个bug说起之六:字体预加载
浏览器在加载一个页面时,会解析网页中的html和css,并开始加载字体文件。字体文件可以通过css中的font-face规则指定,并使用url()函数指定字体文件的路径。 比如下面这样: css font-face {font-family: MyFont;src: url(path/to/font.woff2…...

并查集进阶版
过关代码如下 #define _CRT_SECURE_NO_WARNINGS #include<bits/stdc.h> #include<unordered_set> using namespace std;int n, m; vector<int> edg[400005]; int a[400005], be[400005]; // a的作用就是存放要摧毁 int k; int fa[400005]; int daan[400005]…...

贪心(不相交的开区间、区间选点、带前导的拼接最小数问题)
目录 1.简单贪心 2.区间贪心 不相交的开区间 1.如何删除? 2.如何比较大小 区间选点问题 3.拼接最小数 1.简单贪心 比如:给你一堆数,你来构成最大的几位数 2.区间贪心 不相交的开区间 思路: 首先,如果有两个…...
[力扣题解] 617. 合并二叉树
题目:617. 合并二叉树 思路 递归法遍历,随便一种遍历方式都可以,以前序遍历为例; 代码 class Solution { public:TreeNode* mergeTrees(TreeNode* root1, TreeNode* root2) {if(root1 NULL){return root2;}if(root2 NULL){r…...

kafka-消费者组(SpringBoot整合Kafka)
文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...
Redisson知识
使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁: lock.lock(); 特点: 1.使用Redisson加的锁,具有自动续期机制,如果业务运行时间较长,运行…...
0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...

如何把几个pdf文件合成在一个pdf文件
PDF合并,作为一种常见的文件处理方式,无论是在学术研究、工作汇报还是日常生活中,都有着广泛的应用。本文将详细介绍PDF合并的多种方法,帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站,然后上传pdf。 pd…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...

面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...