当前位置: 首页 > news >正文

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

相关文章:

PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件 组件说明 upsert方式从Kafka topic中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HO…...

keil 5 ARM CC编译错误和警告解释大全(3)序列号2000-3000

2001年:已声明虚拟参数,但从未使用过 2002年:虚拟参数重新定义为do变量 2003:无法优化:常量/表达式传递给可能修改的变量 2004:重新维度的数组作为参数传递 2005:重维度数组等价 2006&…...

CentOS 7 实战指南:文件或目录的权限操作命令详解

前言 这篇文章详细介绍了文件和目录的常用权限操作命令,并提供了全面的技术解析。通过本文,你将学习如何使用 chmod 和 chown 命令来管理文件和目录的权限,控制用户和用户组的访问权限。无论你是初学者还是有经验的系统管理员,这…...

我的第一个前端项目,vue项目从零开始创建和运行

​入门前端,从基础做起,从零开始新建项目 背景:VUE脚手架项目是一个“单页面”应用,即整个项目中只有1个网页! 在VUE脚手架项目中,主要是设计各个“视图组件”,它们都是整个网页中某个部分&…...

【OJ】C++,Java,Python,Go,Rust

for循环语法 // cpp// java// python for i in range(集合): for i, val in enumerate(集合): for v1,v2,v3,... in zip(集合1,集合2,集合3,...):Pair // cpp pair<int, string> first second // java Pair<Integer, String> first() new Pair<>(firstVal…...

Flink 任务指标监控

目录 状态监控指标 JobManager 指标 TaskManager 指标 Job 指标 资源监控指标 数据流监控指标 任务监控指标 网络监控指标 容错监控指标 数据源监控指标 数据存储监控指标 当使用 Apache Flink 进行流处理任务时&#xff0c;可以根据不同的监控需求&#xff0c;监控…...

Go语言程序设计-第7章--接口

Go语言程序设计-第7章–接口 接口类型是对其他类型行为的概括与抽象。 Go 语言的接口的独特之处在于它是隐式实现。对于一个具体的类型&#xff0c;无须声明它实现了哪些接口&#xff0c;只要提供接口所必须实现的方法即可。 7.1 接口即约定 7.2 接口类型 package iotype …...

性能优化-OpenMP基础教程(二)

本文主要介绍OpenMP并行编程技术&#xff0c;编程模型、指令和函数的介绍、以及OpenMP实战的几个例子。希望给OpenMP并行编程者提供指导。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&am…...

让电脑变得更聪明——用python实现五子棋游戏

作为经典的棋类游戏&#xff0c;五子棋深受大众喜爱&#xff0c;但如果仅实现人与人的博弈&#xff0c;那程序很简单&#xff0c;如果要实现人机对战&#xff0c;教会计算机如何战胜人类&#xff0c;那就不是十分容易的事了。本文我们先从简单入手&#xff0c;完成五子棋游戏的…...

C#-接口

接口 (interface) 定义了一个可由类和结构实现的协定。接口可以包含方法、属性、事件和索引器。接口不提供它所定义的成员的实现 — 它仅指定实现该接口的类或结构必须提供的成员。 接口可支持多重继承。在下面的示例中,接口 IComboBox 同时从 ITextBox 和 IListBox 继承。 i…...

ASP.NET可视化流程设计器源码

源码介绍: ASP.NET可视化流程设计器源码已应用于众多大型企事业单位。拥有全浏览器兼容的可视化流程设计器、表单设计器、基于角色的权限管理等系统开发必须功能&#xff0c;大大为您节省开发时间&#xff0c;是您开发OA.CRM、HR等企事业各种应用管理系统和工作流系统的最佳基…...

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…...

PHP进阶-实现网站的QQ授权登录

授权登录是站点开发常见的应用场景&#xff0c;通过社交媒体一键授权可以跳过注册站点账户的繁琐操作。本文将讲解如何用PHP实现QQ授权登录。首先&#xff0c;我们需要申请QQ互联开发者账号获得APPID和密钥&#xff1b;接着&#xff0c;我们下载QQ官方SDK&#xff1a;PHP SDK v…...

字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军

近日&#xff0c;2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办&#xff0c;活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与&#xff0c;为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中&#xff0…...

python moviepy 图文批量合成带字幕口播视频

最近在研究将图片和文本批量合成为带字幕口播视频 主要是基于python的moviepy库 from generator import audio, pics, subs, videodef main():texts_input examplepics_input example# 图片分辨率预处理pics.adjust(pics_input)# 文字转语音audio.text_to_audio(texts_inpu…...

【代码片段】Linux C++打印当前函数调用堆栈

在开发大型项目时&#xff0c;尤其是多线程情况下&#xff0c;一般无法使用断点调试&#xff0c;这时候将当前函数的调用堆栈打印出来是非常有必要和有效的问题排查手段。 这里记录一段Linux环境下&#xff0c;打印函数堆栈的代码。 void get_native_callstack(std::string &a…...

Linux程序、进程以及计划任务(第一部分)

目录 一、程序和进程 1、什么是程序&#xff1f; 2、什么是进程&#xff1f; 3、线程是什么&#xff1f; 4、如何查看是多线程还是单线程 5、进程结束的两种情况&#xff1a; 6、进程的状态 二、查看进程信息的相关命令 1、ps&#xff1a;查看静态的进程统计信息 2、…...

Oracle database 12cRAC异地恢复至单机

环境 rac 环境 byoradbrac Oracle12.1.0.2 系统版本&#xff1a;Red Hat Enterprise Linux Server release 6.5 软件版本&#xff1a;Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit byoradb1&#xff1a;172.17.38.44 byoradb2&#xff1a;172.17.38.4…...

【docker】linux部署docker

简介 首先我需要声明的是&#xff0c;我的系统是centos7&#xff0c;下载工具使用的是yum&#xff1b;在linux上部署docker&#xff0c;之前一直看的是这篇文章Linux之Docker部署&#xff0c;基本上功能方面也都可以使用&#xff0c;部署起来也是比较的简单。首先我先讲述这篇…...

【K8S 云原生】Pod资源限制、Pod容器健康检查(探针)

目录 一、docker的重启方式和K8S重启方式 1、Pod的重启方式&#xff1a; 2、docker的重启策略&#xff1a; 二、yaml文件快速生成&#xff1a; 三、pod的状态&#xff1a; 四、Pod的资源限制 1、限制的方式和种类 2、CPU的限制的格式&#xff1a; 五、K8S拉取镜像的策…...

Python从入门到网络爬虫(模块详解)

模块 我们知道&#xff0c;函数和类都是可以重复调用的代码块。在程序中使用位于不同文件的代码块的方法是&#xff1a;导入 (import) 该对象所在的模块 (mudule)。当程序变得越来越大时&#xff0c;将程序的不同部分根据不同分类方法保存在不同文件中通常会更加方便。 导入模…...

[大厂实践] 无停机迁移大规模关键流量(下)

在系统升级、迁移的过程中&#xff0c;如何验证系统逻辑、性能正确无误&#xff0c;是一个很大的挑战。这一系列介绍了Netflix通过重放流量测试解决这一挑战的实践。原文: Migrating Critical Traffic At Scale with No Downtime — Part 2 想象一下&#xff0c;你被心爱的Netf…...

VMware Workstation虚拟机CentOS 7.9 配置固定ip的步骤

VMware Workstation虚拟机CentOS7.9配置固定ip的步骤 编辑虚拟机 打开VMware Workstation。 选择要配置的虚拟机&#xff0c;但不要启动它。 点击“编辑虚拟机设置”&#xff08;Edit virtual machine settings&#xff09;。 选择“网络适配器”&#xff08;Network Adapter&…...

构建自己的私人GPT

创作不易&#xff0c;请大家多鼓励支持。 在现实生活中&#xff0c;很多人的资料是不愿意公布在互联网上的&#xff0c;但是我们又要使用人工智能的能力帮我们处理文件、做决策、执行命令那怎么办呢&#xff1f;于是我们构建自己或公司的私人GPT变得非常重要。 一、本地部署…...

EtherCAT主站SOEM -- 14 --Qt-Soem通过界面采集从站IO进行显示

EtherCAT主站SOEM -- 14 --Qt-Soem通过界面采集从站IO进行显示 一 mainwindow.c 文件函数:1.1 自定义PDO配置1.2 主站初始化二 motrorcontrol.c 文件三 allvalue.h 文件该文档修改记录:总结一 mainwindow.c 文件函数: 1.1 自定义PDO配置 int IO_setup(uint16 slave) {int...

线程安全、共享变量的可见性

Java中的线程安全问题 谈到线程安全问题&#xff0c;我们先说说什么是共享资源。所谓共享资源&#xff0c;就是说该资源被多个线程所持有或者说多个线程都可以去访问该资源。 线程安全问题是指当多个线程同时读写一个共享资源并且没有任何同步措施时&#xff0c;导致出现脏数…...

电动汽车BMS PCB制板的技术分析与可制造性设计

随着电动汽车行业的迅猛发展&#xff0c;各大厂商纷纷投入巨资进行技术研发和创新。电动汽车的核心之一在于其电池管理系统&#xff08;Battery Management System, BMS&#xff09;&#xff0c;而BMS的心脏则是其印刷电路板&#xff08;PCB&#xff09;。通过这篇文章探讨电动…...

Android 车联网——多屏多用户(十五)

前面几篇文章介绍了多用户和多屏相关的 Manager 和 Service。上一篇文章最后虽然车内乘员都根据配置有自己的对应屏幕,但默认情况下,所有车内乘员依然使用的是当前主用户(司机用户),这一篇我们继续放下看一下用户的创建与分配。 一、用户创建分配 1、创建用户 对于创建用…...

uwsgitop 使用

背景&#xff1a;Django项目 uwsgi&#xff0c;uwsgi.ini 在工程下。 使用&#xff1a; 下载安装uwsgitop [roothost ~]# tar -zxvf uwsgitop-0.11.tar.gz [rootuwsgitop-0.11 ~]# cd uwsgitop-0.11/ [rootuwsgitop-0.11 ~]# python setup.py install [rootuwsgitop-0.11 …...

深信服技术认证“SCSA-S”划重点:文件包含漏洞

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 *点击图片放大展示 深信服…...

注重网站建设 把好宣传思想关口/怎样优化网站关键词排名靠前

一、数据成员 termios 函数族提供了一个常规的终端接口&#xff0c;用于控制非同步通信端口。 这个结构包含了至少下列成员&#xff1a;tcflag_t c_iflag; /* 输入模式 */tcflag_t c_oflag; /* 输出模式 */tcflag_t c_cflag; /* 控制模式 */tcflag_t c_lflag; …...

夫唯seo怎么样/第三方关键词优化排名

BUAA-OO-JML JML 概念与 toolchain JML 是一种为 Java 程序设计的、遵循 design by contract 范式的、基于 Hoare Logic 构建的 behaviour interface specification language。它通过使用 Hoare style precondition、postcondition and invariants 为程序员提供了一套规范而清晰…...

北京网站制作案例/长沙网站推广合作

andy在cmu15-445中提出了不要在数据库系统中使用mmap 因为这个,在网络上找了许多关于 mmap 的文章,无一例外都是对 mmap 的褒奖, 对于mmap可能带来的负面影响少之又少,因此整理了一下andy的论文。 andy的论文 大佬的整理 Ravendb‘s ceo的回应 问题引出 通常情况下,数据库…...

简单的网站怎样做/营销技巧和营销方法视频

1 简介&#xff1a;JDK提供的java.util.Properties类继承自Hashtable类并且实现了Map接口&#xff0c;是使用一种键值对的形式来保存属性集&#xff0c;其中键和值都是字符串类型。java.util.Properties类提供了getProperty()和setProperty()方法来操作属性文件&#xff0c;同时…...

沧州网页制作公司/seo工作室

总结 和主成分分析一样&#xff0c;我们可以用因子得分f1和f2作为两个新的变量&#xff0c;来进行后续的建模&#xff08;例如聚类、回归等&#xff09; 注意&#xff1a;因子分析模型不能用于综合评价&#xff0c;尽管有很多论文是这样写的&#xff0c;但这是存在很大的问题…...

已有网站可以做服务器吗/互换链接的方法

首页 > 新闻列表 > 正文发布时间&#xff1a;2020-11-13 07:30:35 浏览&#xff1a; 19导读&#xff1a;青岛东林木业机械为您提供海南全自动抛光机设计安装的相关知识与详情&#xff1a; 表1全国特级原木、等内加工数量万m3年度等内加工原木特级原木根据木材产地的不同…...