当前位置: 首页 > news >正文

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

  1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
  2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
  8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 数据写入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 检查数据加载状态的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根据格式将数据行拼接成字节数组private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 构造 HTTP 请求中的基本认证头部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 从 HTTP 响应中获取实体内容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 获取用于加载数据的主机地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}

相关文章:

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris&#xff08;以前称为 Palo&#xff09;数据库中并监视加载过程的 Java 类。该类提供了一组方法&#xff0c;用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用&#xff1a; Doris…...

leetcode:1710. 卡车上的最大单元数(python3解法)

难度&#xff1a;简单 请你将一些箱子装在 一辆卡车 上。给你一个二维数组 boxTypes &#xff0c;其中 boxTypes[i] [numberOfBoxesi, numberOfUnitsPerBoxi] &#xff1a; numberOfBoxesi 是类型 i 的箱子的数量。numberOfUnitsPerBoxi 是类型 i 每个箱子可以装载的单元数量。…...

Spring_JDBC的使用

Spring 是个一站式框架&#xff1a;Spring 自身也提供了控制层的 SpringMVC和持久层的 Spring JdbcTemplate。 配置信息 1.下载 Spring JdbcTemplate 的 jar 包,在pom.xml中导入 <dependency><groupId>org.springframework</groupId><artifactId>spr…...

【Python从入门到进阶】34、selenium基本概念及安装流程

接上篇《33、使用bs4获取星巴克产品信息》 上一篇我们介绍了如何使用bs4来解析星巴克网站&#xff0c;获取其产品信息。本篇我们来了解selenium技术的基础。 一、什么是selenium&#xff1f; Selenium是一种用于自动化Web浏览器操作的开源工具。它提供了一组API&#xff08;应…...

如何确保ChatGPT在文本生成中遵循道德和伦理准则?

确保ChatGPT在文本生成中遵循道德和伦理准则是一个复杂而重要的任务。人工智能&#xff08;AI&#xff09;系统&#xff0c;特别是语言模型&#xff0c;具有强大的生成能力&#xff0c;但如果不受到道德和伦理准则的约束&#xff0c;可能会导致一系列问题&#xff0c;包括歧视、…...

RISC-V Linux系统rootfs制作

文章目录 1、下载2、配置与编译3、运行 buildroot 是一个构建嵌入式Linux系统的框架。整个 buildroot 是由Makefile(*.mk) 脚本和 Kconfig(Config.in) 配置文件构成的&#xff0c;因此可以像配置 Linux 内核一样执行 make menuconfig 进行配置&#xff0c;编译出一个完整的、可…...

git常用场景记录 | 拉取远程分支A合并到本地分支B

文章目录 git常用场景记录拉取远程分支A合并到本地分支B本地分支B存在未add与commit的代码 git常用场景记录 doing&#xff0c;最后更新9.1 拉取远程分支A合并到本地分支B 需求描述 在团队合作时&#xff0c;我自己的本地分支B功能已经实现并合并到feature&#xff0c;之后发现…...

如何利用Linux进行数据管理和分析?

Linux是一款非常强大的操作系统&#xff0c;它不仅可以帮助你管理数据&#xff0c;还可以让你成为一名数据分析大师。只要你会使用命令行&#xff0c;你就可以用Linux进行数据管理和分析。 现在&#xff0c;让我们来看看如何使用Linux进行数据管理。 使用sort命令对数据进行排…...

vue3封装echarts图表数据无法渲染到页面

问题是后端的数据已经成功返回到前端了&#xff0c;但是Echarts图表一直不能被渲染&#xff0c;卡了一个多小时&#xff0c;最后问gpt才解决&#xff08;gptyyds&#xff01;&#xff01;&#xff01;&#xff09; methods: {loadGet() {this.$axios.get(this.$httpUrl /goods…...

MySQL索引,事务和存储引擎

一、索引 1、索引的概念 ●索引是一个排序的列表&#xff0c;在这个列表中存储着索引的值和包含这个值的数据所在行的物理地址&#xff08;类似于C语言的链表通过指针指向数据记录的内存地址&#xff09;。 ●使用索引后可以不用扫描全表来定位某行的数据&#xff0c;而是先…...

开发指导—利用CSS动画实现HarmonyOS动效(一)

注&#xff1a;本文内容分享转载自 HarmonyOS Developer 官网文档 一. CSS 语法参考 CSS 是描述 HML 页面结构的样式语言。所有组件均存在系统默认样式&#xff0c;也可在页面 CSS 样式文件中对组件、页面自定义不同的样式。请参考通用样式了解兼容 JS 的类 Web 开发范式支持的…...

电商项目part10 高并发缓存实战

缓存的数据一致性 只要使用到缓存&#xff0c;无论是本地内存做缓存还是使用 redis 做缓存&#xff0c;那么就会存在数据同步的问题。 先读缓存数据&#xff0c;缓存数据有&#xff0c;则立即返回结果&#xff1b;如果没有数据&#xff0c;则从数据库读数据&#xff0c;并且把…...

MongoDB实验——MongoDB shell操作

MongoDB shell操作 实验原理 MongoDB shell是一个可执行文件&#xff0c;是MongoDB自带的一个交互式JavaScript shell&#xff0c;位于MongoDB安装路径下的/bin文件夹中。要启动MongoDB shell&#xff0c;可执行命令mongo。这将在控制台提示符中启动该shell&#xff0c;Mongo…...

数据分析师职业发展道路,工作内容是什么?

很多同学问&#xff0c;参加数据分析就业班后之的就业发展道路是怎样的&#xff0c;工作又能做什么呢&#xff1f; 市面上的常见的工作类型有有运营类、技术类及分析类等&#xff0c;可以根据自己的意愿去做适合自己的工作&#xff0c;但是任何工作其实都是需要一技之长。…...

Vue3 + ts的使用

一. IDE的配置 1. VSCode 插件安装搜索builtin typescript 2. 点击“TypeScript and JavaScript Language Features”右下角的小齿轮&#xff0c;然后选择“Disable (Workspace)” 3. 重新加载工作空间。Takeover 模式将会在你打开一个 Vue 或者 TS 文件时自动启用。 二. 依赖的…...

CF Edu152 C

Problem - C - Codeforces 题意&#xff1a; 思路&#xff1a; 首先&#xff0c;观察样例可知 这种是等效的 推广一下 0000.....111111 ..l..............r...... 这种是等效的 容易想到维护后面第一个1的位置和前面第一个0的位置&#xff0c;然后把所有区间都等效一下&…...

iBooker 技术评论 20230902

一、女子同时供职 16 家公司却从不上班&#xff0c;全国骗薪群体至少有七八百人&#xff0c;为何会出现此类骗薪群体&#xff1f; 社保其实很好绕过。就是这些骗薪者一起创立一个外包公司&#xff0c;然后通过这个公司把自己外包出去。这些人和外包公司签的是劳务合同&#xf…...

视频动态壁纸 Dynamic Wallpaper for Mac中文

Dynamic Wallpaper是一款Mac平台上的动态壁纸应用程序&#xff0c;它可以根据时间等因素动态切换壁纸&#xff0c;提供更加生动和多样化的桌面体验。 Dynamic Wallpaper包含了多个动态壁纸&#xff0c;用户可以根据自己的喜好选择和切换。这些动态壁纸可以根据时间等因素进行自…...

Java“牵手”京东商品列表数据,关键词搜索京东商品数据接口,京东API申请指南

京东商城是一个网上购物平台&#xff0c;售卖各类商品&#xff0c;包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取京东商品列表和商品详情页面数据&#xff0c;您可以通过开放平台的接口或者直接访问京东商城的网页来获取商品详情信息。以下是两种常用方法的介绍&…...

springboot实战(三)之多环境部署配置文件生效方式

环境&#xff1a; jdk&#xff1a;1.8 springboot版本&#xff1a;2.7.15 配置&#xff1a; 1.新建yml文件 在resources包中创建application-dev.yml、application-testing.yml两个yml文件 2.配置 在application.yml进行配置生效文件 3.注意事项 新建yml的名称必须以&qu…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

srs linux

下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935&#xff0c;SRS管理页面端口是8080&#xff0c;可…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

Python训练营-Day26-函数专题1:函数定义与参数

题目1&#xff1a;计算圆的面积 任务&#xff1a; 编写一个名为 calculate_circle_area 的函数&#xff0c;该函数接收圆的半径 radius 作为参数&#xff0c;并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求&#xff1a;函数接收一个位置参数 radi…...