如何为Kafka加上账号密码(二)
认证策略SASL/PLAIN
上篇文章中我们讲解了Kafka认证方式和基础概念,并比较了不同方式的使用场景。
我们在《2024年了,如何更好的搭建Kafka集群?》中集群统一使用PLAINTEXT通信。Kafka通常是在内网使用,但也有特殊的使用场景需要暴漏到公网上,如果未设置认证的Kafka集群允许通过公网访问,或暴漏给全部研发人员是极不安全的方式。
本小节我们就为Kafka添加最简单的认证方式,也就是SASL_PLAINTEXT(即SASL/PLAIN+ 非加密通道)。
配置服务集群节点
Kafka有三个地方可以做认证:
borker节点之间的认证、controller节点间的认证、外部客户端连接集群认证。公司内部使用我们只需要配置外部客户端连接时认证就可以了。本文基于kraft单节点部署Kafka,该节点既是controller又是broker节点。
在开始前,请先前往官网下载安装包(本文使用 Kafka-v3.6.1):
https://kafka.apache.org/downloads
1. 创建jaas配置文件
文件:volume/config/kafka-server-jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requireduser_admin="123456";
};
账号密码以user_{USERNAME}="PASSWORD"这种形式配置,固定写死在jaas的配置文件中。
2. 修改配置文件
修改config/kraft/server.properties如下:
# 修改listeners和advertised
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SASL_PLAINTEXT://:19092
advertised.listeners=PLAINTEXT://192.168.56.103:9092,SASL_PLAINTEXT://192.168.56.103:19092
# 添加mechanisms配置
sasl.enabled.mechanisms=PLAIN
虽然该实验我们只有一个节点,但我们按照集群的方式分配端口:
- 9092:用于broker节点内部通讯和内网客户端通信
- 19092:用于broker节点外部通讯和外网客户端通信
- 9093:用于多个controller节点之间的通信
我们只需要对外部流量做认证即可,也就是所有通过19092端口的通信需要经过认证。
配置文件中有三处需要注意:
sasl.enabled.mechanisms=PLAIN意思是采用PLAIN这种方式做认证。- listeners中的SASL_PLAINTEXT监听器,表示监听19092端口,并对经过该端口的通信做认证。
- advertised.listeners中的SASL_PLAINTEXT监听器,表示仅允许符合
192.168.56.103:19092的流量包抵达SASL_PLAINTEXT监听器。
关于advertised.listeners,Kafka有一套复杂的监听器系统。Kafka允许定义多个监听器监听不同端口,将不同的流量划分到不同的端口。这样的好处是当一个端口流量较大时,不影响其它端口的通信。
3. 启动节点
执行下面这条命令启动节点:
KAFKA_OPTS=-Djava.security.auth.login.config=config/kraft/jaas.conf bin/kafka-server-start.sh config/kraft/server.properties
环境变量KAFKA_OPTS用于指定JAAS配置文件。
经过上面3个步骤服务端就配置好了,下面我们使用客户端验证认证是否生效。
配置客户端
我们将介绍三种客户端连接方式:
- kafka命令工具测试
- offset explorer图形工具连接kafka
- flink连接kafka
kafka命令工具测试
在config目录中创建admin.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
提示:确保账号密码与服务端配置一致
# 创建topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server=192.168.56.103:19092 --command-config=config/admin.conf
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server=192.168.56.103:19092 --command-config=config/admin.conf
使用图形工具Offset explorer连接kafka
![[attach/Pasted image 20240204135019.png]]
此处只需要填写好集群名称即可,我们没有用zk版,zookeeper相关配置无需修改。
![[attach/Pasted image 20240127151018.png]]
![[attach/Pasted image 20240127151059.png]]
![[attach/Pasted image 20240127151109.png]]
这三处设置好,就可以正常连接了。
Flink连接kafka
在无认证的基础上,额外添加三个认证参数:
SECURITY_PROTOCOL_CONFIGSASL_MECHANISMSASL_JAAS_CONFIG
Properties properties = new Properties();properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5");properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");properties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='123456';");/*org.apache.flink.streaming包下面的消费者和生产者已被标记不建议使用org.apache.flink.connector.kafka 推荐使用该包中的工具类*/KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("192.168.56.103:19092").setTopics("test-topic").setGroupId("test").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.earliest()).setProperties(properties).build();final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "test-tip");stream.print();env.execute("Kafka to Print");
我们已经为Kafka集群配置好SASL/PLAIN的认证方式。但这个方式的缺点也是显而易见,它只能在启动Kafka前,固定填写好用户名和密码,无法做到灵活修改账号密码。若要修改,只能重启集群。下一篇文章,我们介绍一种可以灵活修改的认证方式,即SASL/SCRAM。
转载请保留本文连接
相关文章:
如何为Kafka加上账号密码(二)
认证策略SASL/PLAIN 上篇文章中我们讲解了Kafka认证方式和基础概念,并比较了不同方式的使用场景。 我们在《2024年了,如何更好的搭建Kafka集群?》中集群统一使用PLAINTEXT通信。Kafka通常是在内网使用,但也有特殊的使用场景需要…...
【大数据】Flink on YARN,如何确定 TaskManager 数
Flink on YARN,如何确定 TaskManager 数 1.问题2.并行度(Parallelism)3.任务槽(Task Slot)4.确定 TaskManager 数 1.问题 在 Flink 1.5 Release Notes 中,有这样一段话,直接上截图。 这说明从 …...
ES节点故障的容错方案
ES节点故障的容错方案 1. es启动加载逻辑1.1 segment和translg组成和分析1.2 es节点启动流程1.3 es集群的初始化和启动过程 2. master高可用2.1 选主逻辑2.1.1 过滤选主的节点列表2.1.2 Bully算法2.1.2 类Raft协议2.1.3 元数据合并 2.2 HA切换 3. 分片高可用3.1 集群分片汇报3.…...
【Flink】FlinkSQL实现数据从Kafka到MySQL
简介 未来Flink通用化,代码可能就会转换为sql进行执行,大数据开发工程师研发Flink会基于各个公司的大数据平台或者通用的大数据平台,去提交FlinkSQL实现任务,学习Flinksql势在必行。 本博客在sql-client中模拟大数据平台的sql编辑器执行FlinkSQL,使用Flink实现数据从Kafka传…...
Unity GC
本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com 简略版本 在 Unity 中,垃圾回收(Garbage Collection,GC)采用的是基于标记-清除(Mark and Sweep)算法的自动内存管理机制。 基于标记-清…...
Vue源码系列讲解——变化侦测篇【下】(Array的变化侦测)
目录 1. 前言 2. 在哪里收集依赖 3. 使Array型数据可观测 3.1 思路分析 3.2 数组方法拦截器 3.3 使用拦截器 4. 再谈依赖收集 4.1 把依赖收集到哪里 4.2 如何收集依赖 4.3 如何通知依赖 5. 深度侦测 6. 数组新增元素的侦测 7. 不足之处 8. 总结 1. 前言 上一篇文…...
【机器学习笔记】贝叶斯学习
贝叶斯学习 文章目录 贝叶斯学习1 贝叶斯学习背景2 贝叶斯定理3 最大后验假设MAP(Max A Posterior)4 极大似然假设ML(Maximum Likelihood)5 朴素贝叶斯NB6 最小描述长度MDL 1 贝叶斯学习背景 试图发现两件事情的关系(因果关系,先决条件&结论&#x…...
ElasticSearch之倒排索引
写在前面 本文看下es的倒排索引相关内容。 1:正排索引和倒排索引 正排索引就是通过文档id找文档内容,而倒排索引就是通过文档内容找文档id,如下图: 2:倒排索引原理 假定我们有如下的数据: 为了建立倒…...
win11安装mysql8.3.0压缩包版 240206
mysql社区版安装包版windows安装包下载地址 在系统环境变量path无点.的情况下 powershell 可以 .\ 或 ./ 开头表示当前文件夹cmd 可以直接命令或.\开头, 不能./开头 所以 .\ 在cmd和powershell中通用 步骤 在解压目录 .\mysqld --initialize-insecure root无密码初始化.\m…...
数据库索引与优化:深入了解索引的种类、使用与优化
数据库索引与优化:深入了解索引的种类、使用与优化 索引的种类 数据库索引是提高查询速度的重要手段之一,主要分为以下几种类型: 主键索引(Primary Key Index): 唯一标识表中的每一行数据,保…...
React 错误边界组件 react-error-boundary 源码解析
文章目录 捕获错误 hook创建错误边界组件 Provider定义错误边界组件定义边界组件状态捕捉错误渲染备份组件重置组件通过 useHook 控制边界组件 捕获错误 hook getDerivedStateFromError 返回值会作为组件的 state 用于展示错误时的内容 componentDidCatch 创建错误边界组件 P…...
分享66个相册特效,总有一款适合您
分享66个相册特效,总有一款适合您 66个相册特效下载链接:https://pan.baidu.com/s/1jqctaho4sL_iGSNExhWB6A?pwd8888 提取码:8888 Python采集代码下载链接:采集代码.zip - 蓝奏云 学习知识费力气,收集整理更不…...
chagpt的原理详解
GPT(Generative Pre-trained Transformer)是一种基于Transformer架构的生成式预训练模型。GPT-3是其中的第三代,由OpenAI开发。下面是GPT的基本原理: Transformer架构: GPT基于Transformer架构,该架构由Att…...
dockerfile 详细讲解
当编写 Dockerfile 时,你需要考虑你的应用程序所需的环境和依赖项,并将其描述为一系列指令。下面是一个简单的示例,演示如何编写一个用于部署基于 Node.js 的网站的 Dockerfile: Dockerfile # 使用官方 Node.js 镜像作为基础镜像…...
跟着pink老师前端入门教程-day23
苏宁网首页案例制作 设置视口标签以及引入初始化样式 <meta name"viewport" content"widthdevice-width, user-scalableno, initial-scale1.0, maximum-scale1.0, minimum-scale1.0"> <link rel"stylesheet" href"css/normaliz…...
JRT监听程序
本次设计避免以往设计缺陷,老的主要为了保持兼容性,在用的设计就不好调了。 首先,接口抽象时候就不在给参数放仪器ID和处理类了,直接放仪器配置实体,接口实现想用什么属性就用什么属性,避免老方式要扩参数时…...
MCU+SFU视频会议一体化,视频监控,指挥调度(AR远程协助)媒体中心解决方案。
视频互动应用已经是政务和协同办公必备系统,早期的分模块,分散的视频应该不能满足业务需要,需要把视频监控,会议,录存一体把视频资源整合起来,根据客户需求,需要能够多方视频互动,直…...
1184. 欧拉回路(欧拉回路,模板题)
活动 - AcWing 给定一张图,请你找出欧拉回路,即在图中找一个环使得每条边都在环上出现恰好一次。 输入格式 第一行包含一个整数 t,t∈{1,2},如果 t1,表示所给图为无向图,如果 t2,表示所给图为…...
学习 Redis 基础数据结构,不讲虚的。
学习 Redis 基础数据结构,不讲虚的。 一个群友给我发消息,“该学的都学了,怎么就找不到心意的工作,太难了”。 很多在近期找过工作的同学一定都知道了,背诵八股文已经不是找工作的绝对王牌。企业最终要的是可以创造价…...
Android 11 webview webrtc无法使用问题
问题:Android 11 webview 调用webrtc无法使用, 看logcat日志会报如下错误 [ERROR:address_tracker_linux.cc(245)] Could not send NETLINK request: Permission denied (13) 查了下相关的网络权限都有配置了还是不行,还是报这个权限问题 原因࿱…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...
