当前位置: 首页 > news >正文

PiflowX组件-ReadFromKafka

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1
topic_patternTOPIC_PATTERN“”匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*
startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)Kafka consumer 的启动模式。earliest-offset
schemaSCHEMA“”Kafka消息的schema信息。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
groupGROUP“”Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。group_1
propertiesPROPERTIES“”Kafka source连接器其他配置

ReadFromKafka示例配置

{"flow": {"name": "DataGenTest","uuid": "1234","stops": [{"uuid": "0000","name": "DataGen1","bundle": "cn.piflow.bundle.flink.common.DataGen","properties": {"schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]","count": "100","ratio": "5"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","schema": "","format": "json","properties": "{}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","group": "test","startup_mode": "earliest-offset","schema": "id:int,name:string,age:int","format": "json","properties": "{}"}},{"uuid": "3333","name": "ShowData1","bundle": "cn.piflow.bundle.flink.common.ShowData","properties": {"showNumber": "5000"}}],"paths": [{"from": "DataGen1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "ShowData1"}]}
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[{       "filedName": "id","filedType": "INT","kind": "sequence","start": 1,"end": 10000},{       "filedName": "name","filedType": "STRING","kind": "random","length": 15},{       "filedName": "age","filedType": "INT","kind": "random","max": 100,"min": 1} 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

相关文章:

PiflowX组件-ReadFromKafka

ReadFromKafka组件 组件说明 从kafka中读取数据。 计算引擎 flink 有界性 Unbounded 组件分组 kafka 端口 Inport:默认端口 outport:默认端口 组件属性 名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HOST“”无是逗号分隔的Ka…...

Ubuntu 安装MySQL以及基本使用

前言 MySQL是一个开源数据库管理系统,通常作为流行的LAMP(Linux,Apache,MySQL,PHP / Python / Perl)堆栈的一部分安装。它使用关系数据库和SQL(结构化查询语言)来管理其数据。 安装…...

基于Freeswitch实现的Volte网视频通知应用

现在运营商的Volte网络已经很好的支持视频通话了,因此在原来的电话语音通知的基础上,可以更进一步实现视频的通知,让用户有更好的体验,本文就从技术角度,基于Freeswitch来实现此类应用(本文假设读者已对Fre…...

怎么实现Servlet的自动加载

在实际开发时,有时候会希望某些Servlet程序可以在Tomcat启动时随即启动。但在默认情况下,第一次访问servlet的时候,才创建servlet对象。 如果servlet构造函数里面的代码或者init方法里面的代码比较多,就会导致用户第一次访问serv…...

15. Mysql 变量的使用

目录 变量的概述自定义变量系统变量查看系统变量系统变量赋值 局部变量总结参考资料 变量的概述 MySQL支持不同类型的变量,包括自定义变量、系统变量和局部变量。自定义变量是在会话中定义的变量,用于存储临时数据。系统变量是MySQL服务器提供的全局变量…...

为什么ChatGPT采用SSE协议而不是Websocket?

在探索ChatGPT的使用过程中,我们发现GPT采用了流式数据返回的方式。理论上,这种情况可以通过全双工通信协议实现持久化连接,或者依赖于基于EventStream的事件流。然而,ChatGPT选择了后者,也就是本文即将深入探讨的SSE&…...

Elasticsearch:使用 ELSER v2 文本扩展进行语义搜索

Elastic 提供了一个强大的 ELSER 供我们进行语义搜索。ELSER 是一种稀疏向量的搜索方法。我们无需对它做任何的微调及训练。它是一种 out-of-domain 的模型。目前它仅对英文进行支持。希望将来它能对其它的语言支持的更好。更多关于 ELSER 的知识,请参阅文章 “Elas…...

Matlab:BP神经网络算法,二叉决策树

1、BP神经网络算法 (1)步骤 1.准备训练数据和目标值 2.创建并配置BP神经网络模型 3.训练BP神经网络模型 4.用BP神经网络模型预测数据 例:某企业第一年度营业额为132468,第二年度为158948,第三年度为183737,预测第四年度的营…...

Python实现员工管理系统(Django页面版 ) 七

各位小伙伴们好久不见,2024年即将到来,小编在这里提前祝大家新的一年快快乐乐,能够事业有成,学习顺心,家庭和睦,事事顺利。 今天我们本篇要实现的是一个登录界面的实现,其实登录界面的实现看着挺…...

听GPT 讲Rust源代码--src/tools(34)

File: rust/src/tools/clippy/clippy_lints/src/collection_is_never_read.rs 文件"collection_is_never_read.rs"位于Rust源代码中的clippy_lints工具中,其作用是检查在集合类型(如Vec、HashMap等)的实例上执行的操作是否被忽略了…...

k8s的陈述式资源管理(命令行操作)

(一)k8s的陈述式资源管理 1、命令行:kubectl命令行工具——用于一般的资源管理 (1)优点:90%以上ce场景都可以满足 (2)特点:对资源的增、删、查比较方便,对…...

uniapp uview裁剪组件源码修改(u-avatar-cropper),裁出可自定义固定大小图片

u-avatar-cropper修改后 <template><view class"index"><!-- {{userinfo}} --><view class"top"><view class"bg"><image src"../../static/electronic_card/bg.png"></image></view&g…...

【机器学习前置知识】Beta分布

Beta分布与二项分布的关系 Beta分布与二项分布密切相关,由二项分布扩展而来,它是用来描述一个连续型随机变量出现的概率的概率密度分布,表示为 X X X~ B e t a ( a , b ) Beta(a,b) Beta(a,b) , a 、 b a、b a、b 是形状参数。Beta分布本质上也是一个概率密度函数,只是这…...

Notepad++批量更改文件编码格式及文档格式

背景&#xff1a; 在项目中遇到Windows平台VS的MSVC编译不识别Unix下UTF-8编码导致的编译失败问题。需要将Unix下的UTF-8转为UTF-8-BOM格式。网上找了些方式&#xff0c;之后又深入探究了下文档转换的可能性&#xff0c;共享给大家。&#xff08;当然Windows和Unix平台代码格式…...

Linux驱动开发学习笔记6《蜂鸣器实验》

目录 一、蜂鸣器驱动原理 二、硬件原理分析 三、实验程序编写 1、 修改设备树文件 &#xff08;1&#xff09;添加pinctrl节点 &#xff08;2&#xff09;添加BEEP设备节点 &#xff08;3&#xff09;检查PIN 是否被其他外设使用 2、蜂鸣器驱动程序编写 3、编写测试AP…...

鸿蒙(HarmonyOS 3.1) DevEco Studio 3.1开发环境汉化

鸿蒙&#xff08;HarmonyOS 3.1&#xff09; DevEco Studio 3.1开发环境汉化 一、安装环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、设置过程 打开IDE&#xff0c;在第一个菜单File 中找到Settings...菜单 在Setting...中找到Plugins…...

毫米波雷达:从 3D 走向 4D

1 毫米波雷达已广泛应用于汽车 ADAS 系统 汽车智能驾驶需要感知层、决策层、执行层三大核心系统的高效配合&#xff0c;其中感知层通过传感器探知周围的环境。汽车智能驾驶感知层将真实世界的视觉、物理、事件等信息转变成数字信号&#xff0c;为车辆了解周边环境、制定驾驶操…...

CENTOS docker拉取私服镜像

概述 docker的应用越来越多&#xff0c;安装部署越来越方便&#xff0c;批量自动化的镜像生成和发布都需要docker镜像的拉取。 centos6版本太老&#xff0c;docker的使用过程中问题较多&#xff0c;centos7相对简单容易。 本文档主要介绍centos系统安装docker和拉取docker私…...

【前端面经】即时设计

目录 前言一面git 常见命令跨窗口通信vue 响应式原理发布订阅模式翻转二叉树Promise.all()扁平化数组面试官建议 二面Event Loop 原理Promise 相关css 描边方式requestAnimationReact 18 新特性JSX 相关react 输出两次函数式编程React 批处理机制http请求头有哪些本地存储性能优…...

前端三件套html/css/js的基本认识以及示例程序

简介 本文简要讲解了html,css,js.主要是让大家简要了解网络知识 因为实际开发中很少直接写html&css,所以不必过多纠结,了解一下架构就好 希望深度学习可以参考MDN和w3school HTML 基础 HTML (Hyper Text Markup Language) 不是一门编程语言,而是一种用来告知浏览器如…...

云计算:OpenStack 配置云主机实例的存储挂载并实现外网互通

目录 一、实验 1. 环境 2.配置存储挂载 3.云主机实例连接外部网络&#xff08;SNAT&#xff09; 4.外部网络连接云主机实例&#xff08;DNAT&#xff09; 二、问题 1.云主机 ping 不通外部网络 2.nova list 查看云主机列表报错 3.nova list 与 virsh list --all有何区…...

python/selenium/jenkins整合

1、新建python项目&#xff0c;专门写selenium代码&#xff0c;建议用pytest框架写。 2、把代码上传到代码库中。 3、环境配置&#xff1a; 3.1 在跑jenkins的机器上配置好python环境&#xff0c;需要python --version能在任何地方运行&#xff08;配置好系统环境变量&#…...

华为路由器ACL操作SSH接口

ACL的定义 访问控制列表&#xff08;Access Control Lists&#xff0c;ACL&#xff09;是应用在路由器接口的指令列表。这些指令列表用来告诉路由器哪些数据包可以收、哪些数据包需要拒绝。至于数据包是被接收还是拒绝&#xff0c;可以由类似于源地址、目的地址、端口号等的特…...

Flutter 三点三:Dart Stream

Stream Stream用于接收异步事件Stream 可以接收多个异步事件Stream.listen()方法返回StreamSubscription 可用于取消事件订阅&#xff0c;取消后&#xff0c;不再接收事件 基本使用 Stream.fromFutures([Future.delayed(Duration(seconds: 1),(){return "事件1";})…...

centos 防火墙 设置 LTS

centos 防火墙 设置 LTS https://blog.csdn.net/m0_58805648/article/details/130671008...

SAP缓存 表缓存( Table Buffering)

本文主要介绍SAP中的表缓存在查询数据&#xff0c;更新数据时的工作情况以及对应概念。 SAP表缓存的工作 查询数据 更新数据 删除数据 表缓存的概念 表缓存技术设置属性 不允许缓冲&#xff1a; 允许缓冲&#xff0c;但已关闭&#xff1a; 缓冲已激活&#xff1a; 已…...

Mybatis插件入门

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…...

DOA估计算法——迭代自适应算法(IAA)

1 简介 迭代自适应法 (Iterative Adaptive Approach&#xff0c;IAA)估计算法最早由美国的电气工程师和数学家Robert Schmidt和Roy A. Kuc在1986年的一篇论文"Multiple Emitter Location and Signal Parameter Estimation"中首次提出了这一算法&#xff0c; IAA DOA …...

Python If语句以及代码块的基本介绍

if语句 在编程中if语句是一种根据条件执行不同代码块的控制结构,他根据条件的真假来分支程序的执行路径,所以我们可以通过if语句根据不同情况而执行不同的程序 格式 if [条件(bool值或者计算结果为bool类型的算式)] : a11if a>10:print("a大于10") # --> a大…...

[嵌入式专栏](FOC - SVPWM扇区计算Part1)

文章目录 1 . 概要2 . 扇区计算2.1 扇区Ⅰ计算2.2 扇区Ⅱ计算2.3 扇区Ⅲ计算 3 . 小结 【极客技术传送门】 : https://blog.csdn.net/Engineer_LU/article/details/135149485 1 . 概要 经过扇区判断后&#xff0c;就知道在哪个扇区进行输出了 【Q】但是每个扇区分别输出怎样的结…...

常用的网站建设程序有那些/建立营销型网站

导读docker 是Linux下面的容器技术&#xff0c;是目前最火的开源技术之一&#xff0c;我们介绍了docker的基本使用&#xff0c;基本命令&#xff0c;本地网络设置&#xff0c;本地仓库等&#xff0c;今天我们介绍下docker的数据卷的使用。一&#xff0c;数据卷的使用有时候需要…...

商丘网站建设方案/长沙百度首页优化排名

前段时间看到在V公司工作的朋友们都开始使用Mac电脑了。 一直对苹果电脑充满向往的我&#xff0c;实在是好心动&#xff08;同时伴随着一小股心痛&#xff0c;只有一小股…这得花多少银子呀…虽然我用得也是Mac….&#xff09; 堆得小山般的Apple iPAD 批量安装Mac Book Pro系统…...

找人做网站需要准备什么材料/济南今日头条新闻

删除软件要删除软件非常简单&#xff0c;只要执行下面的命令就行&#xff1a;# rpm –e xanim这时&#xff0c;用户要注意使用的是软件的名称xanim&#xff0c;而不是软件包的名称xanim-27.64-3.i386.rpm。如果要删除的软件是其它软件所需要的&#xff0c;用户会得到类似下面…...

dw创建网站相册/怎么恶意点击对手竞价

QML (Qt Modeling Language) is a user interface markup language. It is a declarative language for designing user interface–centric applications....

建站用wordpress 起飞了/百度托管运营哪家好

数据流转 理论上&#xff0c;我们需要对系统数据流转的每个节点做监控&#xff0c;收集数据&#xff0c;以便于分析&#xff0c;但受限于环境或时间问题&#xff0c;因此&#xff0c;需要进行简单分类&#xff0c;选择最需要的地方进行监控 系统硬件资源 对于承载应用的最基础设…...

wordpress 做的商城/东莞做网站的公司吗

前言 &#xff1a; 1、 Git是目前世界上最先进的分布式版本控制系统 Git是一个分布式版本控制系统&#xff0c;简单来说就是一个软件用于记录一个或若干文件内容变化&#xff0c;以便于将来查阅特定版本修订情况的软件 2、 Github是一个为用户提供git服务的网站&#xff0c;简单…...