中国十大门窗品牌排行榜前十名/保定seo排名外包
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。
本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎
SeaTunnel Zeta
Flink
主要特性
- 批处理
- 流处理
- 精确一次
- 列投影
- 并行度
- 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
数据源 | 支持的版本 | 依赖 |
---|---|---|
MongoDB | 通用 | 下载 |
可用性设置
- MongoDB版本:MongoDB 版本 >= 4.0。
- 集群部署:副本集或分片集群。
- 存储引擎:WiredTiger 存储引擎。
- 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
MongoDB BSON 类型 | SeaTunnel 数据类型 |
---|---|
ObjectId | STRING |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Int32 | INTEGER |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Date | DATE |
Timestamp | TIMESTAMP |
Object | ROW |
Array | ARRAY |
对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
MongoDB BSON 类型 | SeaTunnel STRING 表示 |
---|---|
Symbol | {"_value": {"$symbol": "12"}} |
RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。
名称 | 类型 | 必须 | 默认值 | 描述 |
---|---|---|---|---|
hosts | String | 是 | - | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018 |
username | String | 否 | - | 连接 MongoDB 时使用的数据库用户名。 |
password | String | 否 | - | 连接 MongoDB 时使用的密码。 |
database | List | 是 | - | 要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。 |
collection | List | 是 | - | 数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。 |
connection.options | String | 否 | - | MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。 |
batch.size | Long | 否 | 1024 | 游标批大小。 |
poll.max.batch.size | Enum | 否 | 1024 | 轮询新数据时包含在单个批次中的更改流文档的最大数量。 |
poll.await.time.ms | Long | 否 | 1000 | 等待检查更改流上的新结果之前的时间量。 |
heartbeat.interval.ms | String | 否 | 0 | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
incremental.snapshot.chunk.size.mb | Long | 否 | 64 | 增量快照的块大小(MB)。 |
common-options | 否 | - | 源插件通用参数,请参考源通用选项获取详情。 |
提示:
- 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
- MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
- 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:
env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}
提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:
匹配示例 | 表达式 | 描述 |
---|---|---|
前缀匹配 | ^(test).* | 匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。 |
后缀匹配 | .*[p$] | 匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。 |
``` | ||
env { | ||
# You can set engine configuration here | ||
parallelism = 1 | ||
job.mode = "STREAMING" | ||
checkpoint.interval = 5000 | ||
} |
source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism = 1 } }
### 实时流数据格式
{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }
```
到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!
本文由 白鲸开源科技 提供发布支持!
相关文章:

Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…...

智能合约 之 部署ERC-20
Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境(IDE),旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面,使得开发者可以在浏览器中直接进行智能合约的开发,而无需安装任何额外的…...

【C++】用红黑树模拟实现set、map
目录 前言及准备:一、红黑树接口1.1 begin1.2 end1.3 查找1.4 插入1.5 左单旋和右单旋 二、树形迭代器(正向)2.1 前置 三、模拟实现set四、模拟实现map 前言及准备: set、map的底层结构是红黑树,它们的函数通过调用红…...

实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式)
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式) 1、操作环境1、查看当前数据库版本2、操作系统版本3、查看 Linux 系统上的 glibc(GNU C 库)版本(**这里很重要,要下载对应的内核mysql版本…...

深入探讨医保购药APP的技术架构与设计思路
随着移动互联网的发展,医疗保健行业也迎来了数字化转型的浪潮。医保购药APP作为医保体系数字化的一部分,其技术架构和设计思路至关重要。接下来,小编将为您讲解医保购药APP的技术架构与设计思路,为相关从业者提供参考和启发。 一、…...

react中点击按钮不能获取最新的state时候
在这个问题中,用户希望在点击确认按钮时触发handleChange函数,并且能够正确获取到最新的bzText值。最初的代码中,在handleOpen函数中弹出一个确认框,并在确认框的onOk回调函数中调用handleChange函数。然而,由于组件传…...

2、鸿蒙学习-申请调试证书和调试Profile文件
申请发布证书 发布证书由AGC颁发的、为HarmonyOS应用配置签名信息的数字证书,可保障软件代码完整性和发布者身份真实性。证书格式为.cer,包含公钥、证书指纹等信息。 说明 请确保您的开发者帐号已实名认证。每个帐号最多申请1个发布证书。 1、登录AppGa…...

蓝桥杯算法基础(13):十大排序算法(希尔排序) (快速排序)c语言版
希尔排序 优化版的插入排序,优化的地方就是步长(增量)增大了,原来的插入排序的步长(增量)是1,而希尔排序的步长(增量)可以很大,然后逐渐减小直到1形成插入排…...

web学习笔记(三十二)
目录 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 1.2call、apply、bind的不同点 1.3call、apply、bind的使用场景 2. 对象的深拷贝 2.1对象的浅拷贝 2.1对象的深拷贝 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 在没有传参数时&…...

Android 地图SDK 绘制点 删除 指定
问题 Android 地图SDK 删除指定绘制点 详细问题 笔者进行Android 项目开发,对于已标记的绘制点,提供撤回按钮,即删除绘制点,如何实现。 解决方案 新增绘制点 private List<Marker> markerList new ArrayList<>…...

Nodejs 第五十八章(大文件上传)
在现代网站中,越来越多的个性化图片,视频,去展示,因此我们的网站一般都会支持文件上传。 文件上传的方案 大文件上传:将大文件切分成较小的片段(通常称为分片或块),然后逐个上传这…...

Linux编译器--gcc/g++的使用
1. gcc与g gcc与g分别是c语言与c代码的编译器,但同时g也兼容c语言。 我们知道在Linux中,系统并不以文件后缀来区分文件类别。但对于gcc与g等编译器而言却是需要的。Linux中c代码文件的后缀是.c,c代码文件的后缀是.cpp(.cc)(.cxx)。 在Linu…...

苍穹外卖-day13:vue基础回顾+进阶
vue基础回顾进阶 课程内容 VUE 基础回顾路由 Vue-Router状态管理 vuexTypeScript 1. VUE 基础回顾 1.1 基于脚手架创建前端工程 1.1.1 环境要求 要想基于脚手架创建前端工程,需要具备如下环境要求: node.js 前端项目的运行环境 学习web阶段已安…...

蓝桥杯/慈善晚会/c\c++
问题描述 热心公益的G哥哥又来举办慈善晚会了,这次他邀请到了巴菲特、马云等巨富,还邀请到了大V、小C等算法界泰斗。晚会一共邀请了n位尊贵的客人,每位客人都位于不同的城市,也就是说每座城市都有且仅有一位客人。这些城市的编号为…...

2024.3.19
思维导图...

【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyT…...

【DataWhale学习笔记】使用AgentScope调用qwen大模型
AgentScope AgentScope介绍 AgentScope是一款全新的Multi-Agent框架,专为应用开发者打造,旨在提供高易用、高可靠的编程体验! 高易用:AgentScope支持纯Python编程,提供多种语法工具实现灵活的应用流程编排ÿ…...

【C++】手撕AVL树
> 作者简介:დ旧言~,目前大二,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:能直接手撕AVL树。 > 毒鸡汤:放弃自…...

探索 TorchRe-ID--基于 Python 的人员再识别库
导言 人员再识别(re-ID)是计算机视觉中的一项重要任务,在监控系统、零售分析和人机交互中有着广泛的应用。TorchRe-ID 是一个功能强大、用户友好的 Python 库,它为人员再识别任务提供了一套全面的工具和模型。在本文中࿰…...

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Flex)
以弹性方式布局子组件的容器组件。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。Flex组件在渲染时存在二次布局过程,因此在对性能有严格要求的场景下建议使用Column、Row代替。Flex组…...

tmux最基础的一点应用-不用一直挂着ssh,可以干点别的事情
文章目录 使用原因基础命令创建一个窗口退出当前窗口重新进入万一忘记了环境名字想要删除环境 使用原因 跑程序要很久,需要干别的事情,电脑不能一直开,可以使用tmux来管理。 基础命令 创建一个窗口 tmux new -s <你自己起的环境名字&g…...

Java推荐算法——特征加权推荐算法(以申请学校为例)
加权推荐算法 文章目录 加权推荐算法1.推荐算法的简单介绍2.加权推荐算法详细介绍3.代码实现4.总结 1.推荐算法的简单介绍 众所周知,推荐算法有很多种,例如: 1.加权推荐:分为简单的特征加权,以及复杂的混合加权。主要…...

探索什么便签软件好用,可以和手机同步的便签软件
在信息技术日新月异的今天,各类数字工具已经成为我们生活与工作的重要助手。便签软件作为一种简单却高效的辅助工具,悄然改变着人们的记录习惯与时间管理方式。而在诸多便签软件中,能够实现手机与电脑同步功能的产品尤显其独特的价值。那么&a…...

字符函数与字符串函数
前言 本次博客可以说内容最为多的一次博客,讲解同样很细致大家好好看看 1字符函数 在讲解字符函数时,大家得了解什么是字符吧 普通字符a b c 1 转义字符 \n 换行‘ \t’ 水平制表符\r回车 大家了解即可 在C语言中字符也可以有分类 所以我们先来看看…...

Kubernetes 项目整体布局 el-container
整体布局整体布局 你可能会去敲不同的项目,有很多种平台。那么其实都是可以复用的。唯一不同的就是main里面的内容是不同的,边框架子都是相同的。其实框架是不怎么变化的,变化的是main里面。 src/layout/Layout.vue 这里需要新增一个页面Lay…...

AI赋能写作:AI大模型高效写作一本通
❤️作者主页:小虚竹 ❤️作者简介:大家好,我是小虚竹。2022年度博客之星评选TOP 10🏆,Java领域优质创作者🏆,CSDN博客专家🏆,华为云享专家🏆,掘金年度人气作…...

unraid docker.img扩容
unraid 弹Docker image disk utilization of 99%,容器下载/更新失败 我的版本是6.11.5,docker.img满了导致容器不能更新,遇到同样问题的可以先用docker命令清除一下仓库(当然不一定能清理出来,我已经清理过只清理出来1G多点&…...

Python 实现1~100之间的偶数求和
result0 for i in range(101):if i%20:result result i print(result) 或者 result0 for i in range(2,101,2):result result i print(result)...

Leetcode 387. First Unique Character in a String
Problem Given a string s, find the first non-repeating character in it and return its index. If it does not exist, return -1. Algorithm Use two lists: one list is used to count the letters in “s”; the other list is the position where the letter first …...

c++ 自己实现一个迭代器
具体代码 /*自定义迭代器的实现 */ #include <iostream> using namespace std; class num {int val; //具体的数字int length; //数字的位数void calculate_length(){if(val/100){ //这个数字只有1位length1;return;}int x10; //这里就是不断重复除直…...