当前位置: 首页 > news >正文

flink: 将接收到的tcp文本流写入HBase

一、依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_2.12</artifactId><version>1.13.6</version></dependency></dependencies></project>

二、HBase中建表:

create 'hbasetable','family1','family2','family3','family4'

三、在一台服务器上开启nc

nc -lk 9999

四、运行,demo程序

package cn.edu.tju;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import java.util.UUID;public class FlinkHBase3 {
//nc 服务器地址private static String HOST_NAME = "xx.xx.xx.xx";private static int PORT = 9999;private static String DELIMITER ="\n";public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {@Overridepublic DataInfo map(String value) throws Exception {String[] stringList = value.split(",");DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(stringList[0]), stringList[1], Double.parseDouble(stringList[2]));return dataInfo;}});Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");tableEnv.createTemporaryView("dataTable", dataTable);// 这里要配自己HBase的zookeeper地址tableEnv.executeSql("CREATE TABLE flinkTable (\n" +" rowkey STRING,\n" +" family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +" PRIMARY KEY (rowkey) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'hbase-1.4',\n" +" 'table-name' = 'hbasetable',\n" +" 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +")");tableEnv.executeSql("INSERT INTO flinkTable " +"SELECT rowkey, ROW(ts,info,val) FROM dataTable");env.execute("HBaseFlinkJob");}public static class DataInfo{private String rowkey;private Long ts;private String info;private double val;public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getVal() {return val;}public void setVal(double val) {this.val = val;}@Overridepublic String toString() {return "DataInfo{" +"ts=" + ts +", info='" + info + '\'' +", val='" + val + '\'' +'}';}public DataInfo( String rowkey, Long ts, String info, double val) {this.rowkey = rowkey;this.ts = ts;this.info = info;this.val = val;}public DataInfo() {}}}

五、在nc窗口输入:

1689999832,dong,32.45

六、在HBase检查数据是否已经写入:

scan 'hbasetable'

在这里插入图片描述

相关文章:

flink: 将接收到的tcp文本流写入HBase

一、依赖&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...

SpringBoot集成knife4j

SpringBoot集成knife4j 1、什么是Knife4j2、SpringBoor整合Knife4j2.1、Knife4j配置方式12.2 配置方式二2.3、写注解2.4、效果 1、什么是Knife4j 在日常开发中&#xff0c;写接口文档是我们必不可少的&#xff0c;而Knife4j就是一个接口文档工具&#xff0c;可以看作是Swagger…...

Vue3之setup方法

Vue 3 的 setup 方法是 Vue Composition API 的一部分&#xff0c;用于组织和复用 Vue 组件的逻辑代码。Vue Composition API 允许您以更具响应性和函数式的方式来组织和复用 Vue 组件中的代码&#xff0c;特别是在处理复杂逻辑或跨组件共享逻辑时非常有用。 以下是关于 setup…...

MySQL常见索引及其创建

MySQL索引 在 MySQL 数据库中&#xff0c;常见的索引类型包括以下几种&#xff1a; 普通索引&#xff08;Normal Index&#xff09;&#xff1a;最基本的索引类型&#xff0c;没有任何限制。唯一索引&#xff08;Unique Index&#xff09;&#xff1a;要求索引列的值是唯一的…...

高效测量“芯”搭档 | ACM32激光测距仪应用方案

激光测距仪概述 激光测距仪是利用激光对目标的距离进行准确测定的仪器。激光测距仪在工作时向目标射出一束很细的激光&#xff0c;由光电元件接收目标反射的激光束&#xff0c;计时器测定激光束从发射到接收的时间&#xff0c;计算出从观测者到目标的距离。激光测距仪分为手持激…...

基于Hive大数据分析springboot为后端以及vue为前端的的民宿系

标题基于Hive大数据分析springboot为后端以及vue为前端的的民宿系 本文介绍了如何利用Hive进行大数据分析,并结合Spring Boot和Vue构建了一个民宿管理系统。该民民宿管理系统包含用户和管理员登陆注册的功能,发布下架酒店信息,模糊搜索,酒店详情信息展示,收藏以及对收藏的…...

pnpm、monorepo分包管理、多包管理、npm、vite、前端工程化、保姆级教程

浅尝pnpm monorepo 多包管理方案 &#x1f4a1;tips: 创建pnpm monorope多包管理框架流程 初始化 mkdir taurus & cd taurus pnpm init创建基础文件 创建文件pnpm-workspace.yaml packages:- packages/**创建文件夹packages/ -packages/ -package.json -pnpm-workspace…...

vue3封装Element分页

配置当前页 配置每页条数 页面改变、每页条数改变都触发回调 封装分页 Pagination.vue <template><el-paginationbackgroundv-bind"$attrs":page-sizes"pageSizes"v-model:current-page"page"v-model:page-size"pageSize":t…...

真机 ARM64 架构转模拟器 ARM64 架构

本文字数&#xff1a;2051字 预计阅读时间&#xff1a;15分钟 01 需要转换架构的原因 老版 Mac 使用 Intel 芯片&#xff0c;是x86_64架构&#xff0c;相应地在老版 Mac 上运行的模拟器使用的也就是 x86_64架构。 由于模拟器的 x86_64 架构与真机的 arm64、armv7 等架构不冲突&…...

敏捷教练CSM认证考了有没有用,谁说了算?

敏捷教练CSM证书是近年来备受关注的一项证书&#xff0c;它被认为可以提升敏捷开发团队的管理能力和项目执行效率。然而&#xff0c;对于这个证书的价值和含金量&#xff0c;人们的观点却不尽相同。那么&#xff0c;CSM证书到底有没有用&#xff0c;谁来说了算呢&#xff1f; 首…...

Docker-Container

Docker ①什么是容器②为什么需要容器③容器的生命周期容器 OOM容器异常退出容器暂停 ④容器命令清单总览docker createdocker rundocker psdocker logsdocker attachdocker execdocker startdocker stopdocker restartdocker killdocker topdocker statsdocker container insp…...

下载安装anaconda和pytorch的详细方法,以及遇到的问题和解决办法

下载安装Anaconda 首先需要下载Anaconda&#xff0c;可以到官网Anaconda官网或者这里提供一个镜像网站去下载anaconda镜像网站 安装步骤可参考该文章&#xff1a;Anaconda安装步骤&#xff0c;本篇不再赘述 注意环境变量的配置&#xff0c;安装好Anaconda之后一定要在环境变量…...

2020年天津市二级分类土地利用数据(矢量)

天津市&#xff0c;位于华北平原海河五大支流汇流处&#xff0c;东临渤海&#xff0c;北依燕山。地势以平原和洼地为主&#xff0c;北部有低山丘陵&#xff0c;海拔由北向南逐渐下降&#xff0c;地貌总轮廓为西北高而东南低。天津有山地、丘陵和平原三种地形&#xff0c;平原约…...

设计模式——结构型——外观模式Facade

处理器类 public class Cpu {public void start() {System.out.println("处理器启动了...");} } 内存类 public class Memory {public void start() {System.out.println("内存启动了...");} } 硬盘类 public class Disk {public void start() {Syste…...

OpenGL的MVP矩阵理解

OpenGL的MVP矩阵理解 右手坐标系 右手坐标系与左手坐标系都是三维笛卡尔坐标系&#xff0c;他们唯一的不同在于z轴的方向&#xff0c;如下图&#xff0c;左边是左手坐标系&#xff0c;右边是右手坐标系 OpenGL中一般用的是右手坐标系 1.模型坐标系&#xff08;Local Space&…...

前端超分辨率技术应用:图像质量提升与场景实践探索-设计篇

超分辨率&#xff01; 引言 在数字化时代&#xff0c;图像质量对于用户体验的重要性不言而喻。随着显示技术的飞速发展&#xff0c;尤其是移动终端视网膜屏幕的广泛应用&#xff0c;用户对高分辨率、高质量图像的需求日益增长。然而&#xff0c;受限于网络流量、存储空间和图像…...

C++11入门手册第一节,学完直接上手Qt(共两节)

入门 hello.cpp #include <iostream>int main() { std::cout << "Hello Quick Reference\n"<<endl; return 0;} 编译运行 $ g hello.cpp -o hello$ ./hello​Hello Quick Reference 变量 int number 5; // 整数float f 0.95; //…...

Docker部署MinIO对象存储服务

1. 拉取MinIO镜像 # 下载镜像 docker pull minio/minio#查看镜像 docker images2. 创建目录 # 文件存储目录 mkdir -p /opt/minio/data# 配置文件 mkdir -p /opt/minio/config# 日志文件 mkdir -p /opt/minio/logs3. 创建Minio容器并运行 docker run \ -p 9000:9000 \ -p 90…...

基于Echarts的超市销售可视化分析系统(数据+程序+论文)

本论文旨在研究Python技术和ECharts可视化技术在超市销售数据分析系统中的应用。本系统通过对超市销售数据进行分析和可视化展示&#xff0c;帮助决策层更好地了解销售情况和趋势&#xff0c;进而做出更有针对性的决策。本系统主要包括数据处理、数据可视化和系统测试三个模块。…...

使用ai智能写作场景之gpt整理资料,如何ai智能写作整理资料

Ai智能写作助手&#xff1a;Ai智能整理资料小助手 Ai智能整理资料小助手可试用3天&#xff01; 通俗的解释一下怎么用ChatGPT来进行资料整理&#xff1a; 搜寻并获取指定数量的特定领域文章&#xff1a; 想像你在和我说话一样&#xff0c;告诉我你想要多少篇关于某个话题的文…...

C/C++ 内存管理

1、C/C内存分布 首先我们来了解在一个程序中&#xff0c;代码主要存储在哪些地方&#xff1b; 1.栈&#xff1a;又叫堆栈&#xff0c;其中一般存储非静态局部变量、函数参数、返回值等&#xff0c;栈的增长是向下的。 2.内存映射段&#xff1a;是高效的 I/O 映射方式&#xff0…...

android pdf框架-10,相册浏览

MupdfViewer 这是最后apk,源码在前面的文章已经贴过了本站下载地址,只是不是最新的.可能不少是旧的内容. subsampling-scale-image-view这是一个大图片的分块加载的实现.比较不错的.滑动方面我觉得使用flinger的效果比它要流畅,惯性要好. 也有人把这个作成pdf渲染器.但翻页就…...

基于SSM的高校普法系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的高校普法系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring Spri…...

数据结构刷题篇 之 【力扣二叉树基础OJ】详细讲解(含每道题链接及递归图解)

有没有一起拼用银行卡的&#xff0c;取钱的时候我用&#xff0c;存钱的时候你用 1、相同的树 难度等级&#xff1a;⭐ 直达链接&#xff1a;相同的树 2、单值二叉树 难度等级&#xff1a;⭐ 直达链接&#xff1a;单值二叉树 3、对称二叉树 难度等级&#xff1a;⭐⭐ 直达…...

Jackson 2.x 系列【6】注解大全篇二

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Jackson 版本 2.17.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-jaskson-demo 文章目录 注解大全2.11 JsonValue2.12 JsonKey2.13 JsonAnySetter2.14 JsonAnyGetter2.15 …...

在低成本loT mcu上实现深度神经网络端到端自动部署-深度神经网络、物联网、边缘计算、DNN加速——文末完整资料

目录 前言 DNN 量化神经网络 并行超低功耗计算范式 面向内存的部署 结果 原文与源码下载链接 REFERENCES 前言 在物联网极端边缘的终端节点上部署深度神经网络( Deep Neural Networks&#xff0c;DNNs )是支持普适深度学习增强应用的关键手段。基于低成本MCU的终端节点…...

【linux】基础IO |文件操作符

需要掌握&#xff1a;操作文件&#xff0c;本质&#xff1a;进程操作文件。进程和文件的关系 向文件中写入&#xff0c;本质上向硬件中写入->用户没有权利直接写入->操作系统是硬件的管理者&#xff0c;我们可以通过操作系统往硬件写入->操作系统必须提供系统调用&…...

探索 2024 年 Web 开发最佳前端框架

前端框架通过简化和结构化的网站开发过程改变了 Web 开发人员设计和实现用户界面的方法。随着 Web 应用程序变得越来越复杂&#xff0c;交互和动画功能越来越多&#xff0c;这是开发前端框架的初衷之一。 在网络的早期&#xff0c;网页相当简单。它们主要以静态 HTML 为特色&a…...

解决: MAC ERROR [internal] load metadata for docker.io/library/openjdk:17

错误信息&#xff1a; ERROR [internal] load metadata for docker.io/library/openjdk:17 ERROR: failed to solve: openjdk:17: error getting credentials - err: exit status 1, out: 解决方法&#xff1a; running this command rm ~/.docker/config.json before …...

View事件分发

MotionEvent 1.简介 MotionEvent 是Android系统中一个非常重要的类&#xff0c;它代表了屏幕上发生的触摸事件。当用户在屏幕上触摸、滑动或者长按时&#xff0c;都会生成一个MotionEvent对象&#xff0c;这个对象包含了触摸动作的各种信息。 2.事件类型 ACTION_DOWN&#x…...

广州优化网站建设/免费推广网站注册入口

在TCP层&#xff0c;有个FLAGS字段&#xff0c;这个字段有以下几个标识&#xff1a;SYN, FIN, ACK, PSH, RST, URG. 其中&#xff0c;对于我们日常的分析有用的就是前面的五个字段。 它们的含义是&#xff1a; SYN表示建立连接&#xff0c; FIN表示关闭连接&#xff0c; ACK表示…...

微信网站公众平台/新浪体育世界杯

Centos是rpm和yum rpm相关 sudo apt install rpmrpm -qa&#xff1a;查询所安装的所有rpm包 rpm -q 软件包名&#xff1a;查询软件是否安装rpm -qi 软件包名&#xff1a;查询到安装软件的信息 rpm -ql 软件包名&#xff1a;查询软件包安装了哪些文件&#xff0c;安装到了哪里…...

如何做网站的链接结构/南宁网站推广哪家好

Ctrl Shift “” Ctrl Shift “-”...

Wordpress html5 动画/万能优化大师下载

今天要动态添加几条数据如果使用recyclerview就不划算和不够简洁。于是在网上看了addview的使用。 于是就开始来使用了&#xff0c;结果直接。。。。无法形式怎么回事&#xff0c;因为是在OkhttpUtils工具中使用的&#xff0c;以为是更新ui是要在线程中进行的。后面一想不对&a…...

重庆搜索引擎优化/厦门seo排名

Jenkins项目团队决定在稳定性和为Kubernetes等平台提供更好的支持方面分配一些工作量。前者可能会发生一些向后不兼容的变更&#xff0c;将影响发布模型并提供具有更多预置选项的版本&#xff0c;而后者将在与现有Jenkins X项目齐头并进。\u0026#xD;\u0026#xD;Jenkins目前在处理…...

福建省人民政府第七办公室/武汉网络推广seo

同步事件1.声明事件//负责传递消息public delegate void MethodCall(string message);public static event MethodCall requestdata; 2.注册事件Page_Load事件中注册事件 requestdata new MethodCall(FormDataGridViewDataTable_requestdata); 3.显式的触发事件private void bu…...