当前位置: 首页 > news >正文

Flink 基于 TDMQ Apache Pulsar 的离线场景使用实践

背景

Apache Flink 是一个开源的流处理和批处理框架,具有高吞吐量、低延迟的流式引擎,支持事件时间处理和状态管理,以及确保在机器故障时的容错性和一次性语义。Flink 的核心是一个分布式流数据处理引擎,支持 Java、Scala、Python 和 SQL 编程语言,可以在集群或云环境中执行数据流程序。它提供了 DataStream API 用于处理有界或无界数据流,DataSet API 用于处理有界数据集,以及 Table API 和 SQL 接口用于关系型流和批处理。目前 Flink 最新已经迭代至 1.20 版本,在此过程中不光是 Flink 框架,插件本身也有部分 API 以及配置存在变更,本文主要针对较高版本的 1.17 Flink Pulsar 插件进行测试验证,目前 Flink 版本如下: https://nightlies.apache.org/flink/

image.png

部署 Flink

设置 Flink 环境配置

参考 Flink 1.17 官方文档,部署 Flink Docker 版本 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/resource-providers/standalone/docker/#getting-started
首先配置 Flink 集群 JobManager 和 TaskManager 环境信息,注意由于 Connector Pulsar 会使用到堆外内存,并且默认任务的堆外内存为 0,因此此处需要显式声明堆外内存大小 taskmanager.memory.task.off-heap.size,这里设置为 1GB https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup_tm/#configure-off-heap-memory-direct-or-native

image.png

$ FLINK_PROPERTIES=$'\njobmanager.rpc.address: 
jobmanager\ntaskmanager.memory.task.offheap.size: 
1gb\ntaskmanager.memory.process.size: 4gb'
$ docker network create flink-network

部署 JobManager

配置环境变量后部署 JobManager,这里默认映射端口为 8081,部署后登录 8081 端口可以看到 Flink Dashboard 信息。

$ docker run \--rm \--name=jobmanager \--network flink-network \--publish 8081:8081 \--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \flink:1.17.2-scala_2.12 jobmanager

image.png

部署 TaskManager

JobManager 是维护协调任务的组件,部署 JobManager 后还需要部署具体运行任务的 TaskManager。

$ docker run \--rm \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \flink:1.17.2-scala_2.12 taskmanager

运行 TaskManager 后,可以在 8081 JobManager 控制台看到 TaskManager 已经被成功注册,至此 Flink Docker 组件部署完成。

image.png

下载 Flink Cli

在本地编译打包 Pulsar 任务后,还需要使用 Flink Cli 提交本地任务到 Flink Docker 集群,从下方网址下载与当前 Docker 版本一致的 Flink 二进制文件并且解压到本地。 https://flink.apache.org/downloads/

image.png

Demo:Topic 复制

参考 Flink Pulsar Connector 社区文档和 Oceanus 相关文档,Demo 使用 1.17 版本 Flink SDK 将命名空间的一个 Topic 消息全部复制到另一个 Topic 中,Demo 主要展示 Flink Connector 的基础用法,没有使用自定义序列化器及反序列化器,而是使用的是 Connector 内置的 String 序列化器。 https://cloud.tencent.com/document/product/849/85885#pulsar-source-.E5.92.8C-sink-.E7.A4.BA.E4.BE.8B https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#apache-pulsar-connector

主要逻辑

核心逻辑见下方代码,首先使用 ParameterTool 工具解析命令行中传入的参数,之后根据参数信息使用 Connector Source 和 Sink Builder 方法创建一个从 InputTopic 中获取消息发送到 OutputTopic 的 Flink Stream。

public static void main(String[] args) throws Exception {final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 2) {System.err.println("Missing parameters!");return;}final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.enableCheckpointing(60000);env.getConfig().setGlobalJobParameters(parameterTool);String brokerServiceUrl = 
parameterTool.getRequired("broker-service-url");String inputTopic = 
parameterTool.getRequired("input-topic");String outputTopic = 
parameterTool.getRequired("output-topic");String subscriptionName = 
parameterTool.get("subscription-name", "testDuplicate");String token = parameterTool.getRequired("token");//  sourcePulsarSource<String> source = PulsarSource.builder().setServiceUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic).setDeserializationSchema(new 
SimpleStringSchema()).setSubscriptionName(subscriptionName).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).build();DataStream<String> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Pulsar Source");//  sinkPulsarSink<String> sink = PulsarSink.builder().setServiceUrl(brokerServiceUrl).setTopics(outputTopic).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token) .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false).setSerializationSchema(new 
SimpleStringSchema()).build();stream.sinkTo(sink);env.execute("Pulsar Streaming Message Duplication");
}

验证

在 TDMQ Pulsar 版控制台创建流入 Topic NinjaDuplicationInput1 和流出 Topic NinjaDuplicationOutput1。

image.png

代码编译为 Jar 包后,本地上传 Jar 包到 Flink Docker,从角色管理界面获取具有生产和消费角色的 token,命令如下所示:

/usr/local/services/flink/flink-1.17.2 # 
/usr/local/services/flink/flink-1.17.2/bin/flink run 
/tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \--broker-service-url http://pulsar-xxxxx \--input-topic 
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1 \--outputtopic 
pulsarg8akj4eow8z8/devtdmqninjazhou1713856927/ninjaDuplicationOutput1 \--subscription-name ninjaTest1 \--token 
eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVcJob has been submitted with JobID 
c1bdab89c01ef16e00579bd2c6648859

提交任务后,可以看到 Flink Dashboard 出现对应任务,并且状态处于 Running。

image.png

在命令行往 NinjaDuplicationInput1 Topic 发送消息。

/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \
--url http://pulsar-xxxxxx \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "i am the bone of my sword" \
-n 5 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1

消息发送完成后,可以在消息查询控制台观察到目标 Topic NinjaDuplicationOutput1 也出现了五条消息,并且消息内容和发送消息一致。

image.png

image.png

查看 Docker TaskManager 标准输出也能观察到 Sink 往目标 Topic 发送消息的日志。

image.png

Demo:单词计数

单词计数作为 Flink 中最常见的 Demo,能够比较好的阐述 Flink 的流处理思想。此 Demo 参考 StreamNative 的 Demo,使用 1.17 Flink SDK,将 Pulsar Topic 作为源和目标资源,统计源 Topic 消息中每个时间窗口各个单词出现的次数,并且将结果投递到目标 Topic 中。 https://github.com/streamnative/examples/blob/master/pulsar-flink/README.md

主要逻辑

整体 Demo 项目文件见下方链接 pulsar-flink-example.zip 核心逻辑见下方代码,首先使用 ParameterTool 工具解析命令行中传入的参数,之后使用 Flink 内置的反序列化器解析消息体为字符串,在数据处理部分使用系统时间窗口统计时间窗内流入的消息,并且对于每个出现的单词汇聚生成 WordCount 对象,最后使用自定义的序列化器,将 WordCount 对象序列化为 Json 字节数组,投递到目标 Topic 中。 目前 TDMQ pulsar Connector 支持 Pulsar Schema、Flink Schema 以及自定义序列化器三种方法将 Java 对象序列化为 Pulsar Sink 的字节数组消息体。推荐代码使用自定义序列化器的方式序列化定义的 WordCount 对象
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer
还需要注意默认 Sink 配置是开启 Batch Send 模式的,在控制台消息查询时,Batch Message 只会查询到 Batch 中的第一条消息,不利于对照消息数量,Demo 中关闭了 Batch Send 功能。

/*** 参考 streamNative pulsar flink demo* <a href="https://github.com/streamnative/examples/tree/master/pulsar-flink">pulsar-flink example</a>* 由于上方链接的 streamNative flink demo 使用 1.10.1 版本 flink 以及 2.4.17 版本 pulsar connector,* 与当前 1.20 社区版本的 flink 和 pulsar connector api 已经存在部分 api 差异* 因此本 demo 使用 1.17 flink 版本进行重构* <a href="https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/resource-providers/standalone/overview/">1.17 flink doc</a>* <p>* demo 统计时间窗口内源 topic 所有消息中每个单词出现频率次数* 并且将统计结果按照每个单词对应一条消息的格式,序列化后消息后投递到目标 topic 中**/
public class PulsarStreamingWordCount {private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);public static void main(String[] args) throws Exception 
{//  解析任务传参//  默认使用 authToken 方式鉴权final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);if (parameterTool.getNumberOfParameters() < 2) {System.err.println("Missing parameters!");return;}final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.enableCheckpointing(60000);
env.getConfig().setGlobalJobParameters(parameterTool);String brokerServiceUrl = 
parameterTool.getRequired("broker-service-url");String inputTopic = 
parameterTool.getRequired("input-topic");String outputTopic = 
parameterTool.getRequired("output-topic");String subscriptionName = 
parameterTool.get("subscription-name", "WordCountTest");String token = parameterTool.getRequired("token");int timeWindowSecond = parameterTool.getInt("time-window", 60);//  sourcePulsarSource<String> source = 
PulsarSource.builder().setServiceUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic)//  此处将 message 中的 payload 序列化成字符串类型//  目前 source 只支持解析消息 payload 中的内容,将 payload 中的内容解析成 pulsar schema 对象或者自定义的 class 对象//  而无法解析 message 中 properties 中的其他属性,例如 publish_time//  如果需要解析 message 中的 properties,需要在继承类中实现 PulsarDeserializationSchema.getProducedType() 方法//  getProducedType 这个方法实现较为繁琐,需要声明每个反序列化后的属性//  
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#deserializer.setDeserializationSchema(new 
SimpleStringSchema()).setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).build();//  由于此处没有使用消息体中的时间,即没有使用消息的 publish_time//  因此此处使用 noWatermark 模式,使用 taskManager 的时间作为时间窗口DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");//  process//  解析 source 中每行消息,通过空格分割成单个单词,之后进行汇聚处理并且初始化成 WordCount 结构体//  这里使用 TumblingProcessingTimeWindows,即使用当前 taskManager 系统时间计算时间窗口DataStream<WordCount> wc = stream.flatMap((FlatMapFunction<String, WordCount>) (line, collector) -> {LOG.info("current line = {}, word list = {}", line, line.split("\\s"));for (String word : line.split("\\s")) {collector.collect(new 
WordCount(word, 1, null));}}).returns(WordCount.class).keyBy(WordCount::getWord).window(TumblingProcessingTimeWindows.of(Time.seconds(timeWindowSecond))).reduce((ReduceFunction<WordCount>) (c1, c2) -> {WordCount reducedWordCount = new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null);LOG.info("previous [{}] [{}], current wordCount {}", c1, c2, reducedWordCount);return reducedWordCount;});//  sink//  目前 1.17 flink 序列化提供了两种已经实现的方法,一种是使用 pulsar 内置 schema,另一种是使用 flink 的 schema//  但由于目前 tdmq pulsar 提供的是 2.9 版本的 pulsar,对于 schema 支持还不够完善//  此处使用 flink PulsarSerializationSchema<T> 提供的接口,当前主要需要实现 serialize(IN element, PulsarSinkContext sinkContext) 方法
//  将传入的 IN 对象自定义序列化为 byte 数组
//  https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer
PulsarSink<WordCount> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false)
.setSerializationSchema(new PulsarSerializationSchema<WordCount>() {
private ObjectMapper objectMapper;
@Override
public void open(
SerializationSchema.InitializationContext initializationContext,
PulsarSinkContext sinkContext,
SinkConfiguration sinkConfiguration)
throws Exception {objectMapper = new ObjectMapper();
}
@Override
public PulsarMessage<?> serialize(WordCount wordCount, PulsarSinkContext sinkContext) {
//  此处将 wordCount 添加处理时间后,将 wordCount 使用 json 方式序列化为 byte 数组
//  以便能够直接查看消息体内容
byte[] wordCountBytes;wordCount.setSinkDateTime(LocalDateTime.now().toString());
try {wordCountBytes = objectMapper.writeValueAsBytes(wordCount);
} catch (Exception exception) {wordCountBytes = exception.getMessage().getBytes();
}
return PulsarMessage.builder(wordCountBytes).build();
}
})
.build();wc.sinkTo(sink);env.execute("Pulsar Streaming WordCount");
}
}

验证

在 TDMQ Pulsar 版控制台创建流入 Topic NinjaWordCountInput1 和流出 Topic NinjaWordCountOutput1。

image.png

代码编译为 Jar 包后,本地上传 Jar 包到 Flink Docker,从角色管理界面获取具有生产和消费角色的 Token,命令如下所示。

/usr/local/services/flink/flink-1.17.2 # /usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \--broker-service-url http://pulsar-xxxx \--input-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1 \--output-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountOutput1 \--subscription-name ninjaTest3 \--token eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVcJob has been submitted with JobID 6f608d95506f96c3eac012386f840655

提交任务后,可以看到 Flink Dashboard 出现对应任务,并且状态处于 Running。

image.png

在命令行往 NinjaWordCountInput1 Topic 发送消息,此处一共发送两批消息,第一批发送 i am the bone of my sword 5 次,第二批发送 Test1 3 次。

/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \
--url http://pulsar-xxx \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "i am the bone of my sword" \
-n 5 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \
--url http://pulsar-g8akj4eow8z8.sap-8ywks40k.tdmq.ap-gz.qcloud.tencenttdmq.com:8080 \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "test1" \
-n 3 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1

消息发送完成后,可以在消息查询控制台观察到目标 Topic NinjaWordCountOutput1 出现了 8 条消息。

image.png

每条消息为单词名称,单词出现的次数,单词处理的时间点的 Json 字节数组,下图为 am 单词的消息结构,可以发现出现数量与投递消息数吻合,证明任务运行正常。

image.png

查看 TaskManager 可以看出消息体,以及每次解析的消息过程。

image.png

Flink Connector 用法总结

版本选择

目前 Flink 插件生产和消费经过调研,在不进行管控改造以及非标操作的情况下,能满足基本的 TDMQ Pulsar 版使用需求。截至现在 Apache Flink 已经发布 1.20 版本,目前推荐使用 Apache Flink 1.15-1.17 对应 Pulsar Connector,不推荐使用 1.15 以下版本,1.18 及以上版本可以参考 1.17 版本使用。

下面介绍 1.15 和 1.17 版本 Pulsar Flink Connector 主要配置。Flink 版本对应的 Flink Connector 依赖可以在 Pulsar Connector Dependencies 处获取。https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#dependency

image.png

各个版本文档链接: https://nightlies.apache.org/flink/

1.17 Flink Pulsar Connector

代码依赖

Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>4.1.0-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>

Source 代码示例

PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();

Source 参数说明

Connector Source 全部参数可参考 官方文档 ,下表是常用配置参数。

参数名称描述
setServiceUrlTDMQ Pulsar 版接入地址,例如 http://pulsar-xxx:8080
setStartCursor任务起始 topic 位点,目前支持 earliest,latest,消息 id 以及消息时间位点四种配置。需要注意如果任务对应订阅已经存在,则会优先直接使用订阅位点
setTopicstopic 名称,例如 pulsar-xxxxx/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
setDeserializationSchema反序列化消息schema,此处建议使用 Flink 内置的字符串反序列化器 SimpleStringSchema,或者使用 Pulsar 的字符串反序列化器 StringSchema,将消息转换成字符串后,再在业务代码中将字符串转换成自定义的对象
setSubscriptionName订阅名称
setAuthentication鉴权类,目前 tdmq pulsar 统一使用 jwt token 方式鉴权,因此此处固定填写为 setAuthentication(“org.apache.pulsar.client.impl.auth.AuthenticationToken”, )。token 填写 TDMQ 控制台角色秘钥,需要保证秘钥拥有对应 topic 消费权限

sink 代码示例

PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setSerializationSchema(new SimpleStringSchema())
.build();

Sink 参数说明

Connector Sink 全部参数可参考 官方文档 ,下表是常用配置参数。

参数名称描述
setServiceUrlTDMQ Pulsar 版接入地址,例如 http://pulsar-xxx:8080
setTopicstopic 名称,例如 pulsar-xxxx/dev-tdmq-ninjazhou-1713856927/ninjaWordCountOutput1
setSerializationSchema序列化器,将变量序列化为字节数组。推荐自定义实现序列化参数接口,见下文注意事项
setDeliveryGuarantee传输可靠性保证,官方可选参数为 NONE,AT_LEAST_ONCE,EXACTLY_ONCE。由于 EXACTLY_ONCE 需要事务保证,此处只建议填写 AT_LEAST_ONCE,NONE
setAuthentication鉴权类,目前 TDMQ Pulsar 版统一使用 jwt token 方式鉴权,因此此处固定填写为 setAuthentication(“org.apache.pulsar.client.impl.auth.AuthenticationToken”, )。token 填写 TDMQ 控制台角色秘钥,需要保证秘钥拥有对应 topic 生产权限

1.15 flink pulsar connector

代码依赖

Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.4</version>
</dependency>   

Source 代码示例

PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setAdminUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.setSubscriptionName(subscriptionName)
.setSubscriptionType(SubscriptionType.Exclusive)
.setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")
.setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
.build();

Source 参数说明

connector source 全部参数可参考 官方文档 ,下表是常用配置参数。

参数名称描述
setServiceUrlTDMQ Pulsar 版接入地址,例如 http://pulsar-xxxxx:8080
setStartCursor任务起始 topic 位点,目前支持 earliest,latest,消息 id 以及消息时间位点四种配置。需要注意如果任务对应订阅已经存在,则会优先直接使用订阅位点
setTopicstopic 名称,例如 pulsar-xxxx/ninjaWordCountInput1
setDeserializationSchema反序列化消息 schema,此处建议使用 Flink 内置的字符串反序列化器 SimpleStringSchema,或者使用 Pulsar 的字符串反序列化器 StringSchema,将消息转换成字符串后,再在业务代码中将字符串转换成自定义的对象
setSubscriptionName订阅名称
setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME)鉴权类,目前 TDMQ Pulsar 版统一使用 jwt token 方式鉴权,因此此处固定填写为 setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME,“org.apache.pulsar.client.impl.auth.AuthenticationToken”)
setConfig(PulsarOptions.PULSAR_AUTH_PARAMS)鉴权值,目前TDMQ Pulsar 版 统一使用 jwt token 方式鉴权,因此此处固定填写为 setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, ),token 填写 tdmq 控制台角色秘钥,需要保证秘钥拥有对应 topic 消费权限
setAdminUrl管控接入点地址,低版本 connector 需要使用此参数执行创建事务,修改 cursor 位点等管控操作,此处传入地址与 setServiceUrl 中相同
setSubscriptionType低版本 connector 需要指定订阅类型,而高版本默认使用 Exclusive 模式创建订阅。由于 shared 模式依赖事务 ack 消息,并且 pulsar connector 在初始化时已经会将分区 topic 的每个分区都创建 flink 分片,此时使用 shared 模式意义不大,因此在高版本中已经把 shared 模式去除。具体可以参考 [FLINK-30413] Drop Shared and Key_Shared subscription support in Pulsar connector - ASF JIRA 此处只推荐 Exclusive 或 Failover 订阅模式
setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE)如果不开启该参数,插件会依赖事务提交 ack 信息,否则在 Exclusive 和 Failover 订阅模式下会按照 autoCommitCursorInterval 设置的时间间隔自动 ack 拉取的消息,这里需要设置为 setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)

Sink 代码示例

PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setAdminUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")
.setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)
.setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
.build();

Sink 参数说明

Connector Sink 全部参数可参考 官方文档 ,下表是常用配置参数。

参数名称描述
setServiceUrlTDMQ Pulsar 版接入地址,例如 http://pulsar-xxxx:8080
setTopicstopic 名称,例如 pulsar-xxxx/dev-tdmq-ninjazhou-1713856927/ninjaWordCountOutput1
setSerializationSchema序列化器,将变量序列化为字节数组。推荐自定义实现序列化参数接口,见下文注意事项
setDeliveryGuarantee传输可靠性保证,官方可选参数为 NONE,AT_LEAST_ONCE,EXACTLY_ONCE。由于 EXACTLY_ONCE 需要事务保证,此处只建议填写 AT_LEAST_ONCE,NONE
setAdminUrl管控接入点地址,低版本 connector 需要使用此参数执行创建事务,修改 cursor 位点等管控操作,此处传入地址与 setServiceUrl 中相同
setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME)鉴权类,目前 TDMQ Pulsar 版统一使用 jwt token 方式鉴权,因此此处固定填写为 setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME,“org.apache.pulsar.client.impl.auth.AuthenticationToken”)
setConfig(PulsarOptions.PULSAR_AUTH_PARAMS)鉴权值,目前 TDMQ Pulsar 版统一使用 jwt token 方式鉴权,因此此处固定填写为 setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, ),token 填写 TDMQ 控制台角色秘钥,需要保证秘钥拥有对应 topic 生产权限

注意事项

1.  由于 Connector Pulsar 会使用到堆外内存,并且默认任务的堆外内存为 0,因此执行 Pulsar Job 需要显式声明堆外内存大小 taskmanager.memory.task.off-heap.size,例如 1gb。

2.  SetSerializationSchema 反序列化提供了两种已经实现的方法,一种是使用 Pulsar 内置 Schema,另一种是使用 Flink 的 Schema。但这两种方法都会造成业务代码与 Schema 耦合。目前建议实现 PulsarSerializationSchema 接口,主要需要实现 Serialize(IN element, PulsarSinkContext sinkContext) 方法,将传入的 IN 对象自定义序列化为 Byte 数组。

3.  目前 Sink 默认开启 Enable_batch 批量投递模式,会将消息打包后投递。如果想要关闭批量投递功能,可以配置 SetConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, False)。

4.  Flink 时间窗口支持两种 时间获取方式 ,一种直接使用任务的系统时间 ProcessTime,另一种是事件自带时间 EventTime。但目前 Source 只支持解析消息 Payload 中的内容,将 Payload 中的内容解析成 Pulsar Schema 对象或者自定义的 Class 对象,而无法解析 Message 中 Properties 中的其他属性,例如 消息上传时间 Publish_time。如果需要解析 message 中的 Properties,根据文档 需要在继承类中 实现 PulsarDeserializationSchema.getProducedType() 方法。这个方法实现较为繁琐,需要声明每个反序列化后的属性,因此目前建议直接使用 ProcessTime 作为时间窗口时间。

5.  1.16 及以下版本 Flink Source 的 SetSubscriptionType 方法还保留了 Shared 和 Key_shared 订阅模式,这两种订阅模式依赖事务 ACK 消息,并且只有当任务 checkpoint 更新时才会统一提交事务和 ACK。但由于目前 TDMQ Pulsar 没有开放事务功能,因此当前不能同时配置 SetSubscriptionType(SubscriptionType.Shared) 和 SetConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, False) 参数。

6.  Oceanus 内置 Pulsar Connector 是基于 StreamNative 版本,适配 flink 1.13-1.14 版本的 connector,这两个版本较老,与新版本存在较多 API 不兼容,如果使用 Oceanus 内置版本 Pulsar Connector 与高版本 Flink,可能需要较多代码改造。

相关文章:

Flink 基于 TDMQ Apache Pulsar 的离线场景使用实践

背景 Apache Flink 是一个开源的流处理和批处理框架&#xff0c;具有高吞吐量、低延迟的流式引擎&#xff0c;支持事件时间处理和状态管理&#xff0c;以及确保在机器故障时的容错性和一次性语义。Flink 的核心是一个分布式流数据处理引擎&#xff0c;支持 Java、Scala、Pytho…...

远程访问及控制

SSH协议 是一种安全通道协议 对通信数据进行了加密处理&#xff0c;用于远程管理 OpenSSH(SSH由OpenSSH提供) 服务名称&#xff1a;sshd 服务端控制程序&#xff1a; /usr/sbin/sshd 服务端配置文件&#xff1a; /etc/ssh/sshd_config ssh存放的客户端的配置文件 ssh是服务端额…...

【代码随想录训练营】【Day 44】【动态规划-4】| 卡码 46, Leetcode 416

【代码随想录训练营】【Day 44】【动态规划-4】| 卡码 46&#xff0c; Leetcode 416 需强化知识点 背包理论知识 题目 卡码 46. 携带研究材料 01 背包理论基础01 背包理论基础&#xff08;滚动数组&#xff09;01 背包 二维版本&#xff1a;dp[i][j] 表示从下标为[0-i]的物…...

html5实现个人网站源码

文章目录 1.设计来源1.1 网站首页页面1.2 个人工具页面1.3 个人日志页面1.4 个人相册页面1.5 给我留言页面 2.效果和源码2.1 动态效果2.2 目录结构 源码下载 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/139564407 ht…...

【内存管理】内存布局

ARM32位系统的内存布局图 32位操作系统的内存布局很经典&#xff0c;很多书籍都是以32位系统为例子去讲解的。32位的系统可访问的地址空间为4GB&#xff0c;用户空间为1GB ~ 3GB&#xff0c;内核空间为3GB ~ 4GB。 为什么要划分为用户空间和内核空间呢&#xff1f; 一般处理器…...

软件试运行方案(Word)

软件试运行方案&#xff08;直接套用实际项目&#xff0c;原件获取通过本文末个人名片直接获取。&#xff09; 一、试运行目的 二、试运行的准备 三、试运行时间 四、试运行制度 五、试运行具体内容与要求...

Redis原理篇——哨兵机制

Redis原理篇——哨兵机制 1.Redis哨兵2.哨兵工作原理2.1.哨兵作用2.2.状态监控2.3.选举leader2.4.failover 1.Redis哨兵 主从结构中master节点的作用非常重要&#xff0c;一旦故障就会导致集群不可用。那么有什么办法能保证主从集群的高可用性呢&#xff1f; 2.哨兵工作原理 …...

web前端的MySQL:跨领域之旅的探索与困惑

web前端的MySQL&#xff1a;跨领域之旅的探索与困惑 在数字化浪潮的推动下&#xff0c;web前端与MySQL数据库似乎成为了两个不可或缺的领域。然而&#xff0c;当我们将这两者放在一起&#xff0c;尝试探索web前端与MySQL之间的交互与关联时&#xff0c;却发现这是一次充满困惑…...

Postgresql源码(135)生成执行计划——Var的调整set_plan_references

1 总结 set_plan_references主要有两个功能&#xff1a; 拉平&#xff1a;生成拉平后的RTE列表&#xff08;add_rtes_to_flat_rtable&#xff09;。调整&#xff1a;调整前每一层计划中varno的引用都是相对于本层RTE的偏移量。放在一个整体计划后&#xff0c;需要指向一个统一…...

Python魔法之旅专栏(导航)

目录 推荐阅读 1、Python筑基之旅 2、Python函数之旅 3、Python算法之旅 4、博客个人主页 首先&#xff0c;感谢老铁们一直以来对我的支持与厚爱&#xff0c;让我能坚持把Python魔法方法专栏更新完毕&#xff01; 其次&#xff0c;为了方便大家查阅&#xff0c;我将此专栏…...

Python第二语言(五、Python文件相关操作)

目录 1. 文件编码的概念 2. 文件的读取操作 2.1 什么是文件 2.2 open()打开函数 2.3 mode常用的三种基础访问模式 2.4 文件操作及案例 3. 文件的写入操作及刷新文件&#xff1a;write与flush 4. 文件的追加操作 5. 文件操作的综合案例&#xff08;文件备份操作&#x…...

Vue3 组合式 API:依赖注入(四)

provide() provide() 函数是用于依赖注入的一个关键部分。这个函数允许你在组件树中提供一个值或对象&#xff0c;使得任何子组件&#xff08;无论层级多深&#xff09;都能够通过 inject() 函数来访问这些值。 import { provide, ref } from vue; export default { setup(…...

Vue如何引入ElementUI并使用

Element UI详细介绍 Element UI是一个基于Vue 2.0的桌面端组件库&#xff0c;旨在构建简洁、快速的用户界面。由饿了么前端团队开发&#xff0c;提供丰富的组件和工具&#xff0c;帮助开发者快速构建高质量的Vue应用&#xff0c;并且以开放源代码的形式提供。 1. VueElementU…...

VS2019 QT无法打开 源 文件 “QTcpSocket“

VS2019 QT无法打开 源 文件 "QTcpSocket" QT5.15.2_msvc2019_64 严重性 代码 说明 项目 文件 行 禁止显示状态 错误(活动) E1696 无法打开 源 文件 "QTcpSocket" auto_pack_line_demo D:\vs_qt_project\auto_pack_line_de…...

【Golang】Map 稳定有序遍历的实现与探索:保序遍历之道

【Golang】Map 稳定有序遍历的实现与探索&#xff1a;保序遍历之道 大家好 我是寸铁&#x1f44a; 总结了一篇【Golang】Map 稳定有序遍历的实现与探索&#xff1a;保序遍历之道✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 前言&#x1f34e; 在计算机科学中&#xff0c;数据结…...

使用Nextjs学习(学习+项目完整版本)

创建项目 运行如下命令 npx create-next-app next-create创建项目中出现的各种提示直接走默认的就行,一直回车就行了 创建完成后进入到项目运行localhost:3000访问页面,如果和我下面页面一样就是创建项目成功了 整理项目 将app/globals.css里面的样式都删除,只留下最上面三…...

KUKA机器人KRC5控制柜面板LED显示

对于KUKA机器人新系列控制柜KRC5控制柜来说&#xff0c;其控制柜面板LED布局如下图&#xff1a; 其中①②③④分别为&#xff1a; 1、机器人控制柜处于不同状态时&#xff0c;LED显示如下&#xff1a; 2、机器人控制柜正在运行时&#xff1a; 3、机器人控制柜运行时出现的故障…...

为什么选择Python作为AI开发语言

为什么Python适合AI 在当前的科技浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;无疑是最热门的话题之一。无论是自动驾驶、智能推荐还是自然语言处理&#xff0c;AI都在不断改变我们的生活。而在这场技术革命中&#xff0c;Python作为主要的编程语言之一&#xff0c;…...

【算法篇】求最长公共前缀JavaScript版本

题目描述 给你一个大小为 n 的字符串数组 strs &#xff0c;其中包含n个字符串 , 编写一个函数来查找字符串数组中的最长公共前缀&#xff0c;返回这个公共前缀。 数据范围&#xff1a; 数据范围:0<n<5000&#xff0c;0<len(strsi)< 5000 进阶:空间复杂度 O(1)&a…...

搭建RocketMQ主从异步集群

搭建RocketMQ主从异步集群 1、RocketMQ集群模式 为了追求更好的性能&#xff0c;RocketMQ的最佳实践方式都是在集群模式下完成的。RocketMQ官方提供了三种集群搭建方式&#xff1a; 2主2从异步通信方式&#xff1a;使用异步方式进行主从之间的数据复制。吞吐量大&#xff0c;…...

最大子段和问题

最大子段和问题 分数 15 全屏浏览 切换布局 作者 王东 单位 贵州师范学院 最大子段和问题。给定由n个整数组成的序列&#xff0c;求序列中子段的最大和&#xff0c;若所有整数均为负整数时定义最大子段和为0。 输入格式: 第一行输入整数个数n&#xff08;1≤n≤1000&…...

Vue3中的常见组件通信之mitt

Vue3中的常见组件通信之mitt 概述 ​ 在vue3中常见的组件通信有props、mitt、v-model、 r e f s 、 refs、 refs、parent、provide、inject、pinia、slot等。不同的组件关系用不同的传递方式。常见的撘配形式如下表所示。 组件关系传递方式父传子1. props2. v-model3. $refs…...

MySQL快速入门(极简)

SQL 介绍及 MySQL 安装 一、实验简介 本课程为实验楼提供的 MySQL 实验教程&#xff0c;所有的步骤都在实验楼在线实验环境中完成&#xff0c;学习中请按照实验步骤依次操作。 本课程为 SQL 基本语法及 MySQL 基本操作的实验&#xff0c;理论内容较少&#xff0c;动手实践多…...

CentOS7安装NVIDIA显卡驱动指引【笔记】

CentOS7安装NVIDIA显卡驱动指引【笔记】 实践设备:华硕FX-PRO(NVIDIA GeForce GTX 960M) 环境准备: 1、将系统安装到设备上正常运行; 2、设备网络调试,可以正常访问外网; 3、配置ssh服务(非必要,根据实际情况)。 说明: 本文档所提供的指引和参考主要基于特定实践…...

【RabbitMQ】RabbitMQ配置与交换机学习

【RabbitMQ】RabbitMQ配置与交换机学习 文章目录 【RabbitMQ】RabbitMQ配置与交换机学习简介安装和部署1. 安装RabbitMQ2.创建virtual-host3. 添加依赖4.修改配置文件 WorkQueues模型1.编写消息发送测试类2.编写消息接收&#xff08;监听&#xff09;类3. 实现能者多劳 交换机F…...

常见排序算法,快排,希尔,归并,堆排

后面的排序中都要用到的函数 //交换 void Swap(int* p1, int* p2) {int* tmp *p1;*p1 *p2;*p2 tmp; } 包含的头文件 "Sort.h" #pragma once #include<stdio.h> #include<stdlib.h> #include<assert.h> #include<time.h> #include<s…...

语法的时态1——一般现在时(1)

定义&#xff1a;一般现在时用来表示经常发生的动作&#xff0c;以及客观事实。 一般现在时的构成以及标志词 1.一般现在时的结构 &#xff08;1&#xff09;主系表结构 构成&#xff1a;主语be(am,is,ear)其他。属于状态句。 I…...

JAVA:在IDEA引入本地jar包的方法并解决打包scope为system时发布无法打包进lib的方案

一.引入本地Jar包的步骤 有时maven依耐的包是本地的jar包&#xff0c;此时需要进行以下步骤设置。 步骤1.在pom.xml中添加插件设置,将system范围包含进来&#xff0c;此设置是为了在打包时&#xff0c;本地jar包自动生成到部署包里。(若无法打进包&#xff0c;请参考下文的方…...

Hadoop3:MapReduce源码解读之Map阶段的CombineFileInputFormat切片机制(4)

Job那块的断点代码截图省略&#xff0c;直接进入切片逻辑 参考&#xff1a;Hadoop3&#xff1a;MapReduce源码解读之Map阶段的Job任务提交流程&#xff08;1&#xff09; 6、CombineFileInputFormat原理解析 类的继承关系 与TextInputFormat切片机制的区别 框架默认的TextI…...

GPT-4o:OpenAI的最新篇章与深度探索

引言&#xff1a; 在人工智能领域&#xff0c;自然语言处理&#xff08;NLP&#xff09;技术持续引领着技术创新的步伐。自2023年引入以来&#xff0c;GPT系列模型一直以其卓越的语言生成能力而闻名&#xff0c;近期的迭代——GPT-4o&#xff0c;更是为这一领域的研究和应用带…...

可以自己做logo的网站/网站优化的方法有哪些

直接截图了&#xff0c;前后接近一个月了&#xff0c;到目前&#xff0c;还是无法解决一个签名档的问题&#xff0c;百度的效率&#xff0c;无语。。。...

景区网站做电子商务的特点/国外推广网站有什么

展开全部交通灯程序网上很多&#xff0c;你自己稍微搜一下就可以获得&#xff0c;下载下来32313133353236313431303231363533e4b893e5b19e31333431343664稍加改动就可以了。给你一个例程自己修改吧。#include#define uchar unsigned char#define uint unsigned intsbit dulaP2^…...

网站被挂黑链排名降权/搜狗推广助手

优化的过程就是将图素归类,使用laya发布之后,会将对应文件夹的所有图素都打包成一个图集,当然,这个也是可选是否打包 1:首先做好图素分类,比如有个通天塔的功能,里面的图素只有在通天塔界面使用,就可以归类为一个文件夹,取名为babel,设置成打包,使用layabox发布之后,就会生成一…...

哪些网站是做零售的/河北seo基础

简介 此插件是 Malihu jQuery Scrollbar 为了在 Angular2 环境下使用&#xff0c;封装的一个ts的版本。提供directive和service。 从安装量来看&#xff0c;它比不过 perfect-scrollbar&#xff0c;所以我最后也没用它。但是也记录一下用法&#xff0c;万一以后要用呢&#xff…...

黄浦建设机械网站/站长之家站长工具综合查询

闭包&#xff08;closure&#xff09;是函数式编程的重要的语法结构&#xff0c;Python也支持这一特性&#xff0c;下面就开始介绍Python中的闭包。 首先看看闭包的概念&#xff1a;闭包&#xff08;Closure&#xff09;是词法闭包&#xff08;Lexical Closure&#xff09;的简…...

wordpress主题 搜索/收录网

一、去除 \ufeff encodingutf-8-sig 任何东西要存储在计算机中都要编码&#xff0c;视频&#xff0c;音频&#xff0c;文本&#xff0c;所以有时候&#xff0c;我们在打开一个视频的时候会遇到解码错误&#xff0c;不能播放&#xff0c;就是因为我们要将存储在计算机中的东西…...