PiflowX组件-ReadFromKafka
ReadFromKafka组件
组件说明
从kafka中读取数据。
计算引擎
flink
有界性
Unbounded
组件分组
kafka
端口
Inport:默认端口
outport:默认端口
组件属性
| 名称 | 展示名称 | 默认值 | 允许值 | 是否必填 | 描述 | 例子 |
|---|---|---|---|---|---|---|
| kafka_host | KAFKA_HOST | “” | 无 | 是 | 逗号分隔的Kafka broker列表。 | 127.0.0.1:9092 |
| topic | TOPIC | “” | 无 | 否 | 读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。 | topic-1 |
| topic_pattern | TOPIC_PATTERN | “” | 无 | 否 | 匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。 | topic1_* |
| startup_mode | STARTUP_MODE | “” | Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”) | 否 | Kafka consumer 的启动模式。 | earliest-offset |
| schema | SCHEMA | “” | 无 | 否 | Kafka消息的schema信息。 | id:int,name:string,age:int |
| format | FORMAT | “” | Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”) | 是 | 用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。 | json |
| group | GROUP | “” | 无 | 否 | Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。 | group_1 |
| properties | PROPERTIES | “” | 无 | 否 | Kafka source连接器其他配置 |
ReadFromKafka示例配置
{"flow": {"name": "DataGenTest","uuid": "1234","stops": [{"uuid": "0000","name": "DataGen1","bundle": "cn.piflow.bundle.flink.common.DataGen","properties": {"schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]","count": "100","ratio": "5"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","schema": "","format": "json","properties": "{}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","group": "test","startup_mode": "earliest-offset","schema": "id:int,name:string,age:int","format": "json","properties": "{}"}},{"uuid": "3333","name": "ShowData1","bundle": "cn.piflow.bundle.flink.common.ShowData","properties": {"showNumber": "5000"}}],"paths": [{"from": "DataGen1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "ShowData1"}]}
}
示例说明
本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。
字段描述
[{ "filedName": "id","filedType": "INT","kind": "sequence","start": 1,"end": 10000},{ "filedName": "name","filedType": "STRING","kind": "random","length": 15},{ "filedName": "age","filedType": "INT","kind": "random","max": 100,"min": 1}
]
1.id字段
id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.
2.name字段
name字段类型为STRING,使用random生成器,生成字符长度为15。
3.age字段
age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。

相关文章:
PiflowX组件-ReadFromKafka
ReadFromKafka组件 组件说明 从kafka中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HOST“”无是逗号分隔的Ka…...
Ubuntu 安装MySQL以及基本使用
前言 MySQL是一个开源数据库管理系统,通常作为流行的LAMP(Linux,Apache,MySQL,PHP / Python / Perl)堆栈的一部分安装。它使用关系数据库和SQL(结构化查询语言)来管理其数据。 安装…...
基于Freeswitch实现的Volte网视频通知应用
现在运营商的Volte网络已经很好的支持视频通话了,因此在原来的电话语音通知的基础上,可以更进一步实现视频的通知,让用户有更好的体验,本文就从技术角度,基于Freeswitch来实现此类应用(本文假设读者已对Fre…...
怎么实现Servlet的自动加载
在实际开发时,有时候会希望某些Servlet程序可以在Tomcat启动时随即启动。但在默认情况下,第一次访问servlet的时候,才创建servlet对象。 如果servlet构造函数里面的代码或者init方法里面的代码比较多,就会导致用户第一次访问serv…...
15. Mysql 变量的使用
目录 变量的概述自定义变量系统变量查看系统变量系统变量赋值 局部变量总结参考资料 变量的概述 MySQL支持不同类型的变量,包括自定义变量、系统变量和局部变量。自定义变量是在会话中定义的变量,用于存储临时数据。系统变量是MySQL服务器提供的全局变量…...
为什么ChatGPT采用SSE协议而不是Websocket?
在探索ChatGPT的使用过程中,我们发现GPT采用了流式数据返回的方式。理论上,这种情况可以通过全双工通信协议实现持久化连接,或者依赖于基于EventStream的事件流。然而,ChatGPT选择了后者,也就是本文即将深入探讨的SSE&…...
Elasticsearch:使用 ELSER v2 文本扩展进行语义搜索
Elastic 提供了一个强大的 ELSER 供我们进行语义搜索。ELSER 是一种稀疏向量的搜索方法。我们无需对它做任何的微调及训练。它是一种 out-of-domain 的模型。目前它仅对英文进行支持。希望将来它能对其它的语言支持的更好。更多关于 ELSER 的知识,请参阅文章 “Elas…...
Matlab:BP神经网络算法,二叉决策树
1、BP神经网络算法 (1)步骤 1.准备训练数据和目标值 2.创建并配置BP神经网络模型 3.训练BP神经网络模型 4.用BP神经网络模型预测数据 例:某企业第一年度营业额为132468,第二年度为158948,第三年度为183737,预测第四年度的营…...
Python实现员工管理系统(Django页面版 ) 七
各位小伙伴们好久不见,2024年即将到来,小编在这里提前祝大家新的一年快快乐乐,能够事业有成,学习顺心,家庭和睦,事事顺利。 今天我们本篇要实现的是一个登录界面的实现,其实登录界面的实现看着挺…...
听GPT 讲Rust源代码--src/tools(34)
File: rust/src/tools/clippy/clippy_lints/src/collection_is_never_read.rs 文件"collection_is_never_read.rs"位于Rust源代码中的clippy_lints工具中,其作用是检查在集合类型(如Vec、HashMap等)的实例上执行的操作是否被忽略了…...
k8s的陈述式资源管理(命令行操作)
(一)k8s的陈述式资源管理 1、命令行:kubectl命令行工具——用于一般的资源管理 (1)优点:90%以上ce场景都可以满足 (2)特点:对资源的增、删、查比较方便,对…...
uniapp uview裁剪组件源码修改(u-avatar-cropper),裁出可自定义固定大小图片
u-avatar-cropper修改后 <template><view class"index"><!-- {{userinfo}} --><view class"top"><view class"bg"><image src"../../static/electronic_card/bg.png"></image></view&g…...
【机器学习前置知识】Beta分布
Beta分布与二项分布的关系 Beta分布与二项分布密切相关,由二项分布扩展而来,它是用来描述一个连续型随机变量出现的概率的概率密度分布,表示为 X X X~ B e t a ( a , b ) Beta(a,b) Beta(a,b) , a 、 b a、b a、b 是形状参数。Beta分布本质上也是一个概率密度函数,只是这…...
Notepad++批量更改文件编码格式及文档格式
背景: 在项目中遇到Windows平台VS的MSVC编译不识别Unix下UTF-8编码导致的编译失败问题。需要将Unix下的UTF-8转为UTF-8-BOM格式。网上找了些方式,之后又深入探究了下文档转换的可能性,共享给大家。(当然Windows和Unix平台代码格式…...
Linux驱动开发学习笔记6《蜂鸣器实验》
目录 一、蜂鸣器驱动原理 二、硬件原理分析 三、实验程序编写 1、 修改设备树文件 (1)添加pinctrl节点 (2)添加BEEP设备节点 (3)检查PIN 是否被其他外设使用 2、蜂鸣器驱动程序编写 3、编写测试AP…...
鸿蒙(HarmonyOS 3.1) DevEco Studio 3.1开发环境汉化
鸿蒙(HarmonyOS 3.1) DevEco Studio 3.1开发环境汉化 一、安装环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、设置过程 打开IDE,在第一个菜单File 中找到Settings...菜单 在Setting...中找到Plugins…...
毫米波雷达:从 3D 走向 4D
1 毫米波雷达已广泛应用于汽车 ADAS 系统 汽车智能驾驶需要感知层、决策层、执行层三大核心系统的高效配合,其中感知层通过传感器探知周围的环境。汽车智能驾驶感知层将真实世界的视觉、物理、事件等信息转变成数字信号,为车辆了解周边环境、制定驾驶操…...
CENTOS docker拉取私服镜像
概述 docker的应用越来越多,安装部署越来越方便,批量自动化的镜像生成和发布都需要docker镜像的拉取。 centos6版本太老,docker的使用过程中问题较多,centos7相对简单容易。 本文档主要介绍centos系统安装docker和拉取docker私…...
【前端面经】即时设计
目录 前言一面git 常见命令跨窗口通信vue 响应式原理发布订阅模式翻转二叉树Promise.all()扁平化数组面试官建议 二面Event Loop 原理Promise 相关css 描边方式requestAnimationReact 18 新特性JSX 相关react 输出两次函数式编程React 批处理机制http请求头有哪些本地存储性能优…...
前端三件套html/css/js的基本认识以及示例程序
简介 本文简要讲解了html,css,js.主要是让大家简要了解网络知识 因为实际开发中很少直接写html&css,所以不必过多纠结,了解一下架构就好 希望深度学习可以参考MDN和w3school HTML 基础 HTML (Hyper Text Markup Language) 不是一门编程语言,而是一种用来告知浏览器如…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
