kafka配置SASL/PLAIN 安全认证
1 zookeeper配置启动
1.1 zookeeper添加SASL支持
为zookeeper添加SASL支持,在配置文件zoo.cfg添加
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
1.2 zk_server_jaas.conf文件
新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。
1.3 引入jar包
由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样
-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root 489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root 370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root 41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root 12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]#
1.4.修改zkEnv.sh
在zkEnv.sh添加
for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "
1.5.启动zk服务端
执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题
2 kafka配置和启动
2.1.新建kafka_server_jaas.conf,为kafka添加认证信息
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="cluster"password="cluster"user_cluster=“clusterpasswd”user_kafka="kafkapasswd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd";
};
KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。
2. 2.在kafka的配置文件开启SASL认证
listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
2.3 .在server启动脚本JVM参数
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"
2.4 启动
./kafka-server-start.sh ../config/server.properties
2.5
kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功
Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
3 kafka的SASL认证功能认证和使用
1.使用kafka脚本认证
我们使用kafka自带的脚本进行认证。
1.新建kafka_client_jaas.conf,为客户端添加认证信息
在/home下新建kafka_client_jaas.conf,添加以下信息
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapasswd";
};
2.修改客户端配置信息
修改producer.properties和consumer.properties,添加认证机制
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.修改客户端启动脚本
修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将
export KAFKA_HEAP_OPTS=“-Xmx512M”
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"
kafka-console-consumer.sh的修改类似。
4.客户端启动并认证
启动consumer
./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties
启动producer
./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties
producer端发送消息,consumer端成功接收到消息。
4.Java客户端认证
package com.zte.sdn.oscp.jms.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;import java.util.Collections;
import java.util.Properties;public class KafkaTest {@Testpublic void testProduct() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "IP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}@Testpublic void testConsumer() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "(IP):9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "kafka_test_group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafkatest"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}
}相关文章:
kafka配置SASL/PLAIN 安全认证
1 zookeeper配置启动 1.1 zookeeper添加SASL支持 为zookeeper添加SASL支持,在配置文件zoo.cfg添加 authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthSchemesasl jaasLoginRenew36000001.2 zk_server_jaas.conf文件…...
pdf加密如何解除?这样解除加密很简单
pdf加密如何解除?有时,我们可能会收到一些加密的PDF文件,它们不允许我们对其进行编辑或打印。这时,我们需要使用PDF解密工具,以便能够轻松地解除PDF加密并对其进行编辑。那么接下来就给大家介绍一下pdf加密解除的方法。…...
Ubuntu18.04使用Systemback制作系统镜像并还原
系列文章目录 文章目录 系列文章目录前言一、下载Systemback工具二、制作系统镜像到U盘三、安装制作系统 前言 在Ubuntu系统中开发项目时,有时会希望将项目移植到另外一台计算机(如工控机等)上进行部署,通常会在新计算机中安装Ub…...
OpenCV(十五):拷贝图像
在OpenCV中,拷贝图像数据时有两种方式:深拷贝(Deep Copy)和浅拷贝(Shallow Copy)。这两种拷贝方式的主要区别在于是否创建新的图像副本。 浅拷贝(Shallow Copy)是指将图像对象的指针…...
原神世界中的顺序表:派蒙的趣味数据结构讲解
派蒙,那个总是带着疑问眼神的小家伙,是原神世界中的小精灵。他总是充满好奇心,无论是对新的冒险者,还是对各种奇妙的现象。而他的另一个身份,则是原神世界中的数据结构大师。 一天,派蒙遇到了旅行者小森&a…...
电脑入门:路由器 基本设置操作说明
路由器 基本设置操作说明 首先我们我设置路由器,就需要先登录路由器, 那么怎样登路由器啊? 登录路由器的方法是 在ie的地址栏输入:http://192.168.1.1 输入完成以后直接回车 那么如果你输入正确 这个时候就应该听到有用户名的提示 呵呵 这是怎么回事啊? 不要召集 首…...
搜索与图论-拓扑序列
为什么记录呢 因为不记录全忘了 虽然记了也不一定会看 有向无环图一定有拓扑序列邮箱无环图 - 拓扑图 入度为0的点作为起点入度为0的点入队列枚举出边 t->j删掉当前边,t->j . j的入度减1判断j的入度是否为0,来判断是否加入队列 有环: …...
「MySQL-05」MySQL Workbench的下载和使用
目录 一、MySQL workbench的下载和安装 1. MySQL workbench介绍 2. 到MySQL官网下载mysql workbench 3. 安装workbench 二、创建能远程登录的用户并授权 1. 创建用户oj_client 2. 创建oj数据库 3. 给用户授权 4. 在Linux上登录用户oj_client检查其是否能操作oj数据库 三、使用…...
编译期jni类型转换成字符串
背景: 例如android jni 方法的签名, 这个需要每个用户都要知道具体类型,转化成签名, 要想写好签名, 必须很熟悉 类型对应的签名, 尤其java类对象要加个L, 本文将介绍怎么在编译期过程把类型转化成字符, 多个类型在尽性拼接. 定义基础数据结构 template<char ... ch> str…...
优秀的ui设计作品(合集)
UI设计师需要了解的九个Tips 1.图片类APP排版突破 规则是死的,人是活的。很多时候,如果需求是比较宽要尝试突破原则,用一些另类的排版方式,其实也是做好设计的本质。在图片类app中,错落一些的排版会使你的作品更有魅力…...
【c/c++】c和cpp混合编译
c和cpp混合编译 #ifdef __cplusplus extern "C" { #endifextern int test(int, int);#ifdef __cplusplus } #endif在这段代码中,#ifdef __cplusplus 和 #endif 之间的代码是为了在 C 中使用 C 语言的函数声明和定义时,确保编译器正确地处理 C…...
springboot定制banner
这里有几个定制banner的网站 Text to ASCII Art Generator (TAAG) ASCII Generator IMG2TXT: ASCII Art Made Easy!...
Qt 入门实战教程(目录)
为何我要写Qt入门教程 前置课程 《C自学精简实践教程》 教程特点 1 面向企业开发,你在这里学到的任何一步操作,都会直接在企业里用到。 2 注重设计思路训练,抽象分析问题的能力。 Qt 安装 1.1 Windows Qt 5.12.10下载与安装 1.2 我们…...
Ceph入门到精通-Lunix性能分析工具汇总
出于对Linux操作系统的兴趣,以及对底层知识的强烈欲望,因此整理了这篇文章。本文也可以作为检验基础知识的指标,另外文章涵盖了一个系统的方方面面。如果没有完善的计算机系统知识,网络知识和操作系统知识,文档中的工具…...
服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写的参考笔记
2023/8/29 19:21:11 服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写 2023/8/29 19:22:25 在服务器端使用Django WebSocket和客户端使用Uniapp的情况下,以下是代码示例来实现服务器端和客户端之间的群组互发消息。 …...
【考研数学】线性代数第四章 —— 线性方程组(2,线性方程组的通解 | 理论延伸)
文章目录 引言四、线性方程组的通解4.1 齐次线性方程组4.2 非齐次线性方程组 五、方程组解的理论延伸 引言 承接前文,继续学习线性方程组的内容,从方程组的通解开始。 四、线性方程组的通解 4.1 齐次线性方程组 (1)基础解系 —…...
go读取文件的几种方法
一. 整个文件读入内存 直接将数据直接读取入内存,是效率最高的一种方式,但此种方式,仅适用于小文件,对于大文件,则不适合,因为比较浪费内存 1.直接指定文化名读取 在 Go 1.16 开始,ioutil.Rea…...
ChatGPT癌症治疗“困难重重”,真假混讲难辨真假,准确有待提高
近年来,人工智能在医疗领域的应用逐渐增多,其中自然语言处理模型如ChatGPT在提供医疗建议和信息方面引起了广泛关注。然而,最新的研究表明,尽管ChatGPT在许多领域取得了成功,但它在癌症治疗方案上的准确性仍有待提高。…...
docker打包vue vite前端项目
打包vue vite 前端项目 1.打包时将测试删除 2.修改配置 3.打包项目 npm run build 显示成功(黄的也不知道是啥) 打包好的前端文件放入 4.配置 default.conf upstream wms-app {server 你自己的ip加端口 ;server 192.168.xx.xx:8080 ; } server { …...
zookeeper 查询注册的 dubbo 服务
1. 连接zookeeper 服务端 使用bin 目录下zk客户端连接服务器, ./zkCli.sh -server 127.0.0.1:2181 2. 查询Dubbo 服务 # 查询所有服务 ls /dubbo # 查询指定服务调用 ls /dubbo/服务名(接口地址)/consumers # 查询指定服务调用 ls /dubbo/服务名(接口地址)/pr…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
