b站视频怎么快速推广/优化关键词排名优化公司
DorisStreamLoadObserver
类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:
DorisStreamLoadObserver(Keys options)
: 这是类的构造函数,用于初始化加载数据所需的配置选项。void streamLoad(WriterTuple data) throws Exception
: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple
对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。private void checkStreamLoadState(String host, String label) throws IOException
: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。private byte[] addRows(List<byte[]> rows, int totalBytes)
: 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException
: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。private String getBasicAuthHeader(String username, String password)
: 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。private HttpEntity getHttpEntity(CloseableHttpResponse response)
: 这是一个实用方法,用于从 HTTP 响应中提取实体内容。private String getLoadHost()
: 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。
DorisStreamLoadObserver
类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 数据写入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 检查数据加载状态的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根据格式将数据行拼接成字节数组private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 构造 HTTP 请求中的基本认证头部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 从 HTTP 响应中获取实体内容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 获取用于加载数据的主机地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}
相关文章:

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读
DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用: Doris…...

leetcode:1710. 卡车上的最大单元数(python3解法)
难度:简单 请你将一些箱子装在 一辆卡车 上。给你一个二维数组 boxTypes ,其中 boxTypes[i] [numberOfBoxesi, numberOfUnitsPerBoxi] : numberOfBoxesi 是类型 i 的箱子的数量。numberOfUnitsPerBoxi 是类型 i 每个箱子可以装载的单元数量。…...

Spring_JDBC的使用
Spring 是个一站式框架:Spring 自身也提供了控制层的 SpringMVC和持久层的 Spring JdbcTemplate。 配置信息 1.下载 Spring JdbcTemplate 的 jar 包,在pom.xml中导入 <dependency><groupId>org.springframework</groupId><artifactId>spr…...

【Python从入门到进阶】34、selenium基本概念及安装流程
接上篇《33、使用bs4获取星巴克产品信息》 上一篇我们介绍了如何使用bs4来解析星巴克网站,获取其产品信息。本篇我们来了解selenium技术的基础。 一、什么是selenium? Selenium是一种用于自动化Web浏览器操作的开源工具。它提供了一组API(应…...

如何确保ChatGPT在文本生成中遵循道德和伦理准则?
确保ChatGPT在文本生成中遵循道德和伦理准则是一个复杂而重要的任务。人工智能(AI)系统,特别是语言模型,具有强大的生成能力,但如果不受到道德和伦理准则的约束,可能会导致一系列问题,包括歧视、…...

RISC-V Linux系统rootfs制作
文章目录 1、下载2、配置与编译3、运行 buildroot 是一个构建嵌入式Linux系统的框架。整个 buildroot 是由Makefile(*.mk) 脚本和 Kconfig(Config.in) 配置文件构成的,因此可以像配置 Linux 内核一样执行 make menuconfig 进行配置,编译出一个完整的、可…...

git常用场景记录 | 拉取远程分支A合并到本地分支B
文章目录 git常用场景记录拉取远程分支A合并到本地分支B本地分支B存在未add与commit的代码 git常用场景记录 doing,最后更新9.1 拉取远程分支A合并到本地分支B 需求描述 在团队合作时,我自己的本地分支B功能已经实现并合并到feature,之后发现…...

如何利用Linux进行数据管理和分析?
Linux是一款非常强大的操作系统,它不仅可以帮助你管理数据,还可以让你成为一名数据分析大师。只要你会使用命令行,你就可以用Linux进行数据管理和分析。 现在,让我们来看看如何使用Linux进行数据管理。 使用sort命令对数据进行排…...

vue3封装echarts图表数据无法渲染到页面
问题是后端的数据已经成功返回到前端了,但是Echarts图表一直不能被渲染,卡了一个多小时,最后问gpt才解决(gptyyds!!!) methods: {loadGet() {this.$axios.get(this.$httpUrl /goods…...

MySQL索引,事务和存储引擎
一、索引 1、索引的概念 ●索引是一个排序的列表,在这个列表中存储着索引的值和包含这个值的数据所在行的物理地址(类似于C语言的链表通过指针指向数据记录的内存地址)。 ●使用索引后可以不用扫描全表来定位某行的数据,而是先…...

开发指导—利用CSS动画实现HarmonyOS动效(一)
注:本文内容分享转载自 HarmonyOS Developer 官网文档 一. CSS 语法参考 CSS 是描述 HML 页面结构的样式语言。所有组件均存在系统默认样式,也可在页面 CSS 样式文件中对组件、页面自定义不同的样式。请参考通用样式了解兼容 JS 的类 Web 开发范式支持的…...

电商项目part10 高并发缓存实战
缓存的数据一致性 只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题。 先读缓存数据,缓存数据有,则立即返回结果;如果没有数据,则从数据库读数据,并且把…...

MongoDB实验——MongoDB shell操作
MongoDB shell操作 实验原理 MongoDB shell是一个可执行文件,是MongoDB自带的一个交互式JavaScript shell,位于MongoDB安装路径下的/bin文件夹中。要启动MongoDB shell,可执行命令mongo。这将在控制台提示符中启动该shell,Mongo…...

数据分析师职业发展道路,工作内容是什么?
很多同学问,参加数据分析就业班后之的就业发展道路是怎样的,工作又能做什么呢? 市面上的常见的工作类型有有运营类、技术类及分析类等,可以根据自己的意愿去做适合自己的工作,但是任何工作其实都是需要一技之长。…...

Vue3 + ts的使用
一. IDE的配置 1. VSCode 插件安装搜索builtin typescript 2. 点击“TypeScript and JavaScript Language Features”右下角的小齿轮,然后选择“Disable (Workspace)” 3. 重新加载工作空间。Takeover 模式将会在你打开一个 Vue 或者 TS 文件时自动启用。 二. 依赖的…...

CF Edu152 C
Problem - C - Codeforces 题意: 思路: 首先,观察样例可知 这种是等效的 推广一下 0000.....111111 ..l..............r...... 这种是等效的 容易想到维护后面第一个1的位置和前面第一个0的位置,然后把所有区间都等效一下&…...

iBooker 技术评论 20230902
一、女子同时供职 16 家公司却从不上班,全国骗薪群体至少有七八百人,为何会出现此类骗薪群体? 社保其实很好绕过。就是这些骗薪者一起创立一个外包公司,然后通过这个公司把自己外包出去。这些人和外包公司签的是劳务合同…...

视频动态壁纸 Dynamic Wallpaper for Mac中文
Dynamic Wallpaper是一款Mac平台上的动态壁纸应用程序,它可以根据时间等因素动态切换壁纸,提供更加生动和多样化的桌面体验。 Dynamic Wallpaper包含了多个动态壁纸,用户可以根据自己的喜好选择和切换。这些动态壁纸可以根据时间等因素进行自…...

Java“牵手”京东商品列表数据,关键词搜索京东商品数据接口,京东API申请指南
京东商城是一个网上购物平台,售卖各类商品,包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取京东商品列表和商品详情页面数据,您可以通过开放平台的接口或者直接访问京东商城的网页来获取商品详情信息。以下是两种常用方法的介绍&…...

springboot实战(三)之多环境部署配置文件生效方式
环境: jdk:1.8 springboot版本:2.7.15 配置: 1.新建yml文件 在resources包中创建application-dev.yml、application-testing.yml两个yml文件 2.配置 在application.yml进行配置生效文件 3.注意事项 新建yml的名称必须以&qu…...

java透传参数至logback,自定义日志文件名。过期日志文件自动删除
LogFilter filter日志拦截,把不需要打印的日志信息拦截在外,只录入有key参数的(filterReply FilterReply.ACCEPT;)。 package com.***.***.filter;import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.IL…...

HFSS 3维曲线导入
HFSS 3维曲线导入 简介环境参考代码使用结果 简介 如图一所示,CST中可以通过导入和到出由任意点组成的曲线,但是HFSS中貌似不能导入(如图二所示),如果我们要将matlab的产生的曲线的点的数据导入特变麻烦,特…...

【消息中心】kafka消费失败重试10次的问题
Kafka消费失败重试10次的问题通常可以通过配置Kafka消费者来调整。在Kafka中,可以通过设置max.poll.interval.ms、fetch.min.bytes、fetch.max.bytes、fetch.max.wait.ms等参数来控制消费者的拉取消息的行为。 在Spring-Kafka中,消费失败的重试次数可以…...

无涯教程-Python机器学习 - Semi-supervised Learning函数
Python机器学习 中的 Semi - 无涯教程网无涯教程网提供https://www.learnfk.com/python-machine-learning/machine-learning-with-python-semi-supervised-learning.html...

7 | 计算每个键对应的平均值,并按降序排序
假设您有一个包含销售订单的RDD,其中每个元素是一个键值对,其中键表示产品名称,值表示销售数量。您希望按产品名称对销售订单进行分组,并计算每个产品的总销售数量。最后,希望获得每个产品的总销售数量以及按产品名称分组的详细销售订单列表。 计算每个键对应的总和和计数…...

kafka详解二
kafka详解二 1、 offset 1.1 offset介绍 老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地…...

SAP_ABAP_接口技术_RFC远程函数实践总结
SAP ABAP顾问能力模型梳理_企业数字化建设者的博客-CSDN博客SAP Abap顾问能力模型,ALV/REPORT|SMARTFROM|SCREEN|OLE|BAPI|BDC|PI|IDOC|RFC|API|WEBSERVICE|Enhancement|UserExits|Badi|Debughttps://blog.csdn.net/java_zhong1990/article/details/132469977 SAP接…...

计算机 --> 磁盘 --> 分区
一、分区;步骤较完整,未测试 网址:电脑硬盘怎么分区?C盘/D盘/E盘......快来创建自己的DIY磁盘吧!_e盘怎么创建_布 迪的博客-CSDN博客...

3D视觉测量:形位公差 平面度测量(附源码)
文章目录 0. 测试效果1. 基本内容2. 实现方法3. 代码实现4. 参考文章目录:3D视觉测量目录微信:dhlddxB站: Non-Stop_0. 测试效果 1. 基本内容 平面度是一个表达平面平整程度的度量指标,它描述了一个表面与一个理想平面之间的偏差程度。在工程和制造领域,平面度是一个重要的…...

vmware虚拟机远程开发
目录 1. 下载vmware2. 下载ubuntu镜像3. 安装4. 做一些设置4.1 分辨率设置4.2 语言下载4.3 输入法设置4.4 时区设置 5. 直接切换管理员权限6. 网络6.1 看ip6.2 ssh 7. 本地编译器连接远程服务器7.1 创建远程部署的配置7.2 文件同步7.3 远程启动项目 8. ubuntu安装golang环境8.1…...