当前位置: 首页 > news >正文

kafka配置SASL/PLAIN 安全认证

1 zookeeper配置启动

1.1 zookeeper添加SASL支持

为zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.2 zk_server_jaas.conf文件

新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。

1.3 引入jar包

由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root  489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root  370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root   41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root   12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]# 

1.4.修改zkEnv.sh

在zkEnv.sh添加


for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "

1.5.启动zk服务端

执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题

2 kafka配置和启动

2.1.新建kafka_server_jaas.conf,为kafka添加认证信息

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="cluster"password="cluster"user_cluster=“clusterpasswd”user_kafka="kafkapasswd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule required  username="kafka"  password="kafkapasswd";  
};

KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。

2. 2.在kafka的配置文件开启SASL认证

listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true

2.3 .在server启动脚本JVM参数

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

2.4 启动

./kafka-server-start.sh ../config/server.properties

2.5

kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)

3 kafka的SASL认证功能认证和使用

1.使用kafka脚本认证

我们使用kafka自带的脚本进行认证。

1.新建kafka_client_jaas.conf,为客户端添加认证信息

在/home下新建kafka_client_jaas.conf,添加以下信息

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapasswd";
};

2.修改客户端配置信息

修改producer.properties和consumer.properties,添加认证机制

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN 

3.修改客户端启动脚本

修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

export KAFKA_HEAP_OPTS=“-Xmx512M”

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

kafka-console-consumer.sh的修改类似。

4.客户端启动并认证

启动consumer

./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

启动producer

./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

producer端发送消息,consumer端成功接收到消息。

4.Java客户端认证

package com.zte.sdn.oscp.jms.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;import java.util.Collections;
import java.util.Properties;public class KafkaTest {@Testpublic void testProduct() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "IP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}@Testpublic void testConsumer() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "(IP):9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "kafka_test_group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafkatest"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}
}

相关文章:

kafka配置SASL/PLAIN 安全认证

1 zookeeper配置启动 1.1 zookeeper添加SASL支持 为zookeeper添加SASL支持&#xff0c;在配置文件zoo.cfg添加 authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthSchemesasl jaasLoginRenew36000001.2 zk_server_jaas.conf文件…...

pdf加密如何解除?这样解除加密很简单

pdf加密如何解除&#xff1f;有时&#xff0c;我们可能会收到一些加密的PDF文件&#xff0c;它们不允许我们对其进行编辑或打印。这时&#xff0c;我们需要使用PDF解密工具&#xff0c;以便能够轻松地解除PDF加密并对其进行编辑。那么接下来就给大家介绍一下pdf加密解除的方法。…...

Ubuntu18.04使用Systemback制作系统镜像并还原

系列文章目录 文章目录 系列文章目录前言一、下载Systemback工具二、制作系统镜像到U盘三、安装制作系统 前言 在Ubuntu系统中开发项目时&#xff0c;有时会希望将项目移植到另外一台计算机&#xff08;如工控机等&#xff09;上进行部署&#xff0c;通常会在新计算机中安装Ub…...

OpenCV(十五):拷贝图像

在OpenCV中&#xff0c;拷贝图像数据时有两种方式&#xff1a;深拷贝&#xff08;Deep Copy&#xff09;和浅拷贝&#xff08;Shallow Copy&#xff09;。这两种拷贝方式的主要区别在于是否创建新的图像副本。 浅拷贝&#xff08;Shallow Copy&#xff09;是指将图像对象的指针…...

原神世界中的顺序表:派蒙的趣味数据结构讲解

派蒙&#xff0c;那个总是带着疑问眼神的小家伙&#xff0c;是原神世界中的小精灵。他总是充满好奇心&#xff0c;无论是对新的冒险者&#xff0c;还是对各种奇妙的现象。而他的另一个身份&#xff0c;则是原神世界中的数据结构大师。 一天&#xff0c;派蒙遇到了旅行者小森&a…...

电脑入门:路由器 基本设置操作说明

路由器 基本设置操作说明 首先我们我设置路由器,就需要先登录路由器, 那么怎样登路由器啊? 登录路由器的方法是 在ie的地址栏输入:http://192.168.1.1 输入完成以后直接回车 那么如果你输入正确 这个时候就应该听到有用户名的提示 呵呵 这是怎么回事啊? 不要召集 首…...

搜索与图论-拓扑序列

为什么记录呢 因为不记录全忘了 虽然记了也不一定会看 有向无环图一定有拓扑序列邮箱无环图 - 拓扑图 入度为0的点作为起点入度为0的点入队列枚举出边 t->j删掉当前边&#xff0c;t->j . j的入度减1判断j的入度是否为0&#xff0c;来判断是否加入队列 有环&#xff1a; …...

「MySQL-05」MySQL Workbench的下载和使用

目录 一、MySQL workbench的下载和安装 1. MySQL workbench介绍 2. 到MySQL官网下载mysql workbench 3. 安装workbench 二、创建能远程登录的用户并授权 1. 创建用户oj_client 2. 创建oj数据库 3. 给用户授权 4. 在Linux上登录用户oj_client检查其是否能操作oj数据库 三、使用…...

编译期jni类型转换成字符串

背景: 例如android jni 方法的签名, 这个需要每个用户都要知道具体类型,转化成签名, 要想写好签名, 必须很熟悉 类型对应的签名, 尤其java类对象要加个L, 本文将介绍怎么在编译期过程把类型转化成字符, 多个类型在尽性拼接. 定义基础数据结构 template<char ... ch> str…...

优秀的ui设计作品(合集)

UI设计师需要了解的九个Tips 1.图片类APP排版突破 规则是死的&#xff0c;人是活的。很多时候&#xff0c;如果需求是比较宽要尝试突破原则&#xff0c;用一些另类的排版方式&#xff0c;其实也是做好设计的本质。在图片类app中&#xff0c;错落一些的排版会使你的作品更有魅力…...

【c/c++】c和cpp混合编译

c和cpp混合编译 #ifdef __cplusplus extern "C" { #endifextern int test(int, int);#ifdef __cplusplus } #endif在这段代码中&#xff0c;#ifdef __cplusplus 和 #endif 之间的代码是为了在 C 中使用 C 语言的函数声明和定义时&#xff0c;确保编译器正确地处理 C…...

springboot定制banner

这里有几个定制banner的网站 Text to ASCII Art Generator (TAAG) ASCII Generator IMG2TXT: ASCII Art Made Easy!...

Qt 入门实战教程(目录)

为何我要写Qt入门教程 前置课程 《C自学精简实践教程》 教程特点 1 面向企业开发&#xff0c;你在这里学到的任何一步操作&#xff0c;都会直接在企业里用到。 2 注重设计思路训练&#xff0c;抽象分析问题的能力。 Qt 安装 1.1 Windows Qt 5.12.10下载与安装 1.2 我们…...

Ceph入门到精通-Lunix性能分析工具汇总

出于对Linux操作系统的兴趣&#xff0c;以及对底层知识的强烈欲望&#xff0c;因此整理了这篇文章。本文也可以作为检验基础知识的指标&#xff0c;另外文章涵盖了一个系统的方方面面。如果没有完善的计算机系统知识&#xff0c;网络知识和操作系统知识&#xff0c;文档中的工具…...

服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写的参考笔记

2023/8/29 19:21:11 服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写 2023/8/29 19:22:25 在服务器端使用Django WebSocket和客户端使用Uniapp的情况下&#xff0c;以下是代码示例来实现服务器端和客户端之间的群组互发消息。 …...

【考研数学】线性代数第四章 —— 线性方程组(2,线性方程组的通解 | 理论延伸)

文章目录 引言四、线性方程组的通解4.1 齐次线性方程组4.2 非齐次线性方程组 五、方程组解的理论延伸 引言 承接前文&#xff0c;继续学习线性方程组的内容&#xff0c;从方程组的通解开始。 四、线性方程组的通解 4.1 齐次线性方程组 &#xff08;1&#xff09;基础解系 —…...

go读取文件的几种方法

一. 整个文件读入内存 直接将数据直接读取入内存&#xff0c;是效率最高的一种方式&#xff0c;但此种方式&#xff0c;仅适用于小文件&#xff0c;对于大文件&#xff0c;则不适合&#xff0c;因为比较浪费内存 1.直接指定文化名读取 在 Go 1.16 开始&#xff0c;ioutil.Rea…...

ChatGPT癌症治疗“困难重重”,真假混讲难辨真假,准确有待提高

近年来&#xff0c;人工智能在医疗领域的应用逐渐增多&#xff0c;其中自然语言处理模型如ChatGPT在提供医疗建议和信息方面引起了广泛关注。然而&#xff0c;最新的研究表明&#xff0c;尽管ChatGPT在许多领域取得了成功&#xff0c;但它在癌症治疗方案上的准确性仍有待提高。…...

docker打包vue vite前端项目

打包vue vite 前端项目 1.打包时将测试删除 2.修改配置 3.打包项目 npm run build 显示成功&#xff08;黄的也不知道是啥&#xff09; 打包好的前端文件放入 4.配置 default.conf upstream wms-app {server 你自己的ip加端口 ;server 192.168.xx.xx:8080 ; } server { …...

zookeeper 查询注册的 dubbo 服务

1. 连接zookeeper 服务端 使用bin 目录下zk客户端连接服务器&#xff0c; ./zkCli.sh -server 127.0.0.1:2181 2. 查询Dubbo 服务 # 查询所有服务 ls /dubbo # 查询指定服务调用 ls /dubbo/服务名(接口地址)/consumers # 查询指定服务调用 ls /dubbo/服务名(接口地址)/pr…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

Qemu arm操作系统开发环境

使用qemu虚拟arm硬件比较合适。 步骤如下&#xff1a; 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载&#xff0c;下载地址&#xff1a;https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...