Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase
续第一篇:Java代码将Mysql表数据导入HBase表
源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase
- 一、整体介绍
- 二、依赖
- 三、测试结果
- 四、源码
一、整体介绍
-
HBase
特质连接HBase, 创建HBase执行对象
- 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
Configuration conf = HBaseConfiguration.create()
conf.set(String, String) - 创建连接:多个连接(池化)
Connection con = ConnectionFactory.createConnection() - 创建数据表:表名: String
Table table = con.getTable(TableName)
def build(): HBase // 初始化配置信息 def initPool(): HBase // 初始化连接池 def finish(): Executor // 完成 返回执行对象
- 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
-
Executor
特质对HBase进行操作的方法: 包含如下函数
def exists(tableName: String): Boolean // 验证数据表是否存在 def create(tableName: String, columnFamilies: Seq[String]): Boolean // 创建数据表 def drop(tableName: String): Boolean // 删除数据表 def put(tableName: String, data: util.List[Put]): Boolean // 批量插入数据
-
Jdbc
封装Jdbc封装
- 初始化连接
driver : com.mysql.cj.jdbc.Driver
参数:url, username, password
创建连接 - 初始化执行器
sql, parameters
创建执行器【初始化参数】 - 执行操作并返回【结果】
DML: 返回影响数据库表行数
DQL: 返回查询的数据集合
EX: 出现异常结果
- 初始化连接
-
MyHBase
用于实现HBase
和Executor
特质 -
测试数据格式
mysql表
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `test_table_for_hbase`; CREATE TABLE `test_table_for_hbase` (`test_id` int NULL DEFAULT NULL,`test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_age` int NULL DEFAULT NULL,`test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112'); INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113'); INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114'); INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115'); -- .... 省略以下数据部分
hbase表
# 创建表 库名:表名, 列族1, 列族2 create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo" truncate 'hbase_test:tranfer_from_mysql' # 清空hbase_test命名空间下的tranfer_from_mysql表 scan 'hbase_test:tranfer_from_mysql' # 查看表
二、依赖
<dependencies><!-- HBase 驱动 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.1.3</version></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency>
</dependencies>
三、测试结果
终端有个日志的小警告(无伤大雅hh),输出为 true
查看hbase表,发现数据正常导入
四、源码
scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载
Executor
package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}
HBase
package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int = -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out = {available = falsethis}def in = available = true}def initPool(): HBasedef finish(): Executor
}
MyHBase
package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{private lazy val config: Configuration = HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()override def build(): HBase = {if(statusCode == -1){conf.foreach(t => config.set(t._1, t._2))statusCode = 0this}else{throw new HBaseException("build() function must be invoked first")}}override def initPool(): HBase = {if(statusCode == 0){val POOL_SIZE = if (pooled) {if (poolSize <= 0) 3 else poolSize} else 1for (i <- 1 to POOL_SIZE) {pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))}statusCode = 1this}else{throw new HBaseException("initPool() function must be invoked only after build()")}}override def finish(): Executor = {if (statusCode == 1) {statusCode = 2new Executor {override def exists(tableName: String): Boolean = {var pc: PoolCon = nulltry{pc = getConval exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception => e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {if (exists(tableName)) {return false}var pc: PoolCon = nulltry {pc = getConval builder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf => builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException("finish() function must be invoked only after initPool()")}}private def getCon = {val left: ArrayBuffer[PoolCon] = pool.filter(_.available)if (left.isEmpty) {throw new HBaseException("no available connection")}left.apply(0).out}private def close(con: PoolCon) = {if (null != con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}
Jdbc
package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX = Value(0) val DML = Value(1) val DQL = Value(2) }// 3种结果(异常,DML,DQL)封装case class ResThree(rst: Result.Value) {def to[T <: ResThree]: T = this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex = new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update = affectedRows}object Dml {def apply(affectedRows: Int): Dml = new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet => T) = {val list: util.List[T] = new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql = new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {def con() = {// 1.1 显式加载 JDBC 驱动程序(只需要一次)Class.forName("com.mysql.cj.jdbc.Driver")// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) = {// 2.1 创建执行对象val pst = con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null != params && params.nonEmpty) {params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))}pst}try {val connect = con()val prepared = pst(connect)sql match {case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")=> Dml(prepared.executeUpdate())case sql if sql.matches("^(select|SELECT) .*")=> Dql(prepared.executeQuery())case _ => Ex(new SQLException(s"illegal sql command : $sql"))}} catch {case e: Exception => Ex(e)}}}
Test
import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit = {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(user = "root",url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",password = "123456")// 执行SQL查询,并将结果封装在ResThree对象中val toEntity: ResThree = jdbcOpr("select * from test_table_for_hbase where test_id between ? and ?",Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst == Result.EX) {// 如果异常,执行异常结果处理toEntity.to[Ex]println("出现异常结果处理")} else {// 如果没有异常,将查询结果转换为HBase的Put对象列表val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {// 创建一个Put对象,表示HBase中的一行val put = new Put(Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),Bytes.toBytes(rst.getString("test_name")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型)put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),Bytes.toBytes(rst.getString("test_gender")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),Bytes.toBytes(rst.getString("test_phone")))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() > 0) {// 初始化HBase连接池并执行Put操作val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1).build().initPool().finish()// 执行Put操作,并返回是否成功val bool = exe.put("hbase_test:tranfer_from_mysql", puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println("查无数据")}}}
}
相关文章:

Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase 续第一篇:Java代码将Mysql表数据导入HBase表 源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码 一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化…...

前端工程化:基于Vue.js 3.0的设计与实践
这里写目录标题 《前端工程化:基于Vue.js 3.0的设计与实践》书籍引言本书概述主要内容作者简介为什么选择这本书?结语 《前端工程化:基于Vue.js 3.0的设计与实践》书籍 够买连接—>https://item.jd.com/13952512.html 引言 在前端技术日…...

Linux☞进程控制
在终端执行命令时,Linux会建立进程,程序执行完,进程会被终止;Linux是一个多任务的OS,允许多个进程并发运行; Linxu中启动进程的两种途径: ①手动启动(前台进程(命令gedit)...后台进程(命令‘&’)) ②…...
mybatis离谱bug乱转类型
字符串传入的参数被转成了int: Param("online") String online<if test"online 0">and (heart_time is null or heart_time <![CDATA[ < ]]> UNIX_TIMESTAMP(SUBDATE(now(),INTERVAL 8 MINUTE)) )</if><if test"…...

视频监控管理平台LntonCVS视频汇聚平台充电桩视频监控应用方案
随着新能源汽车的广泛使用,公众对充电设施的安全性和可靠性日益重视。为了提高充电桩的安全管理和站点运营效率,LntonCVS公司推出了一套全面的新能源汽车充电桩视频监控与管理解决方案。 该方案通过安装高分辨率摄像头,对充电桩及其周边区域进…...
VS环境Python:深度探索与实用指南
VS环境Python:深度探索与实用指南 在编程领域,VS环境(Visual Studio环境)与Python的结合,为开发者们提供了一种强大而灵活的开发体验。这种结合不仅提升了开发效率,还增强了代码的可读性和可维护性。然而&…...

SpringBoot整合SpringSecurit(二)通过token进行访问
在文章:SpringBoot整合SpringSecurit(一)实现ajax的登录、退出、权限校验-CSDN博客 里面,使用的session的方式进行保存用户信息的,这一篇文章就是使用token的方式。 在其上进行的改造,可以先看SpringBoot…...
【算法训练 day50 打家劫舍、打家劫舍Ⅱ、打家劫舍Ⅲ】
目录 一、打家劫舍-LeetCode 198思路 二、打家劫舍Ⅱ-LeetCode 213思路 三.打家劫舍Ⅲ-LeeCode 337思路 一、打家劫舍-LeetCode 198 Leecode链接: leetcode 198 思路 dp数组含义为:当前数组范围下能偷到的最多的钱。递推公式为:dp[j] max(dp[j-2]nums[j],dp[j-1…...

YOLOv8改进 | 卷积模块 | 在主干网络中添加/替换蛇形卷积Dynamic Snake Convolution
💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 蛇形动态卷积是一种新型的卷积操作,旨在提高对细长和弯曲的管状结构的特征提取能力。它通过自适应地调整卷积核的权重࿰…...
深入解析力扣183题:从不订购的客户(LEFT JOIN与子查询方法详解)
关注微信公众号 数据分析螺丝钉 免费领取价值万元的python/java/商业分析/数据结构与算法学习资料 在本篇文章中,我们将详细解读力扣第183题“从不订购的客户”。通过学习本篇文章,读者将掌握如何使用SQL语句来解决这一问题,并了解相关的复杂…...

牛客NC32 求平方根【简单 二分 Java/Go/C++】
题目 题目链接: https://www.nowcoder.com/practice/09fbfb16140b40499951f55113f2166c 思路 Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值即可*** para…...

王道408数据结构CH3_栈、队列
概述 3.栈、队列和数组 3.1 栈 3.1.1 基本操作 3.1.2 顺序栈 #define Maxsize 50typedef struct{ElemType data[Maxsize];int top; }SqStack;3.1.3 链式栈 typedef struct LinkNode{ElemType data;struct LinkNode *next; }*LiStack;3.2 队列 3.2.1 基本操作 3.2.2 顺序存储…...

Angular 由一个bug说起之六:字体预加载
浏览器在加载一个页面时,会解析网页中的html和css,并开始加载字体文件。字体文件可以通过css中的font-face规则指定,并使用url()函数指定字体文件的路径。 比如下面这样: css font-face {font-family: MyFont;src: url(path/to/font.woff2…...

并查集进阶版
过关代码如下 #define _CRT_SECURE_NO_WARNINGS #include<bits/stdc.h> #include<unordered_set> using namespace std;int n, m; vector<int> edg[400005]; int a[400005], be[400005]; // a的作用就是存放要摧毁 int k; int fa[400005]; int daan[400005]…...

贪心(不相交的开区间、区间选点、带前导的拼接最小数问题)
目录 1.简单贪心 2.区间贪心 不相交的开区间 1.如何删除? 2.如何比较大小 区间选点问题 3.拼接最小数 1.简单贪心 比如:给你一堆数,你来构成最大的几位数 2.区间贪心 不相交的开区间 思路: 首先,如果有两个…...
[力扣题解] 617. 合并二叉树
题目:617. 合并二叉树 思路 递归法遍历,随便一种遍历方式都可以,以前序遍历为例; 代码 class Solution { public:TreeNode* mergeTrees(TreeNode* root1, TreeNode* root2) {if(root1 NULL){return root2;}if(root2 NULL){r…...

kafka-消费者组(SpringBoot整合Kafka)
文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...
Redisson知识
使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁: lock.lock(); 特点: 1.使用Redisson加的锁,具有自动续期机制,如果业务运行时间较长,运行…...
0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...

如何把几个pdf文件合成在一个pdf文件
PDF合并,作为一种常见的文件处理方式,无论是在学术研究、工作汇报还是日常生活中,都有着广泛的应用。本文将详细介绍PDF合并的多种方法,帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站,然后上传pdf。 pd…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

uniapp手机号一键登录保姆级教程(包含前端和后端)
目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...