【shell-10】shell实现的各种kafka脚本
kafka-shell工具
- 背景
- 日志 log
- 一.启动kafka->(start-kafka)
- 二.停止kafka->(stop-kafka)
- 三.创建topic->(create-topic)
- 四.删除topic->(delete-topic)
- 五.获取topic列表->(list-topic)
- 六. 将文件数据 录入到kafka->(file-to-kafka)
- 七.将kafka数据 下载到文件->(kafka-to-file)
- 八. 查看topic的groupID消费情况->(list-group)
背景
注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。
日志 log
此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用
#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2#调试日志
function log_debug(){content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"[ $LOG_LEVEL -le 1 ] && echo -e "\033[32m" ${content} "\033[0m"
}
#信息日志
function log_info(){content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"[ $LOG_LEVEL -le 2 ] && echo -e "\033[32m" ${content} "\033[0m"
}
#警告日志
function log_warn(){content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"[ $LOG_LEVEL -le 3 ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"[ $LOG_LEVEL -le 4 ] && echo -e "\033[31m" ${content} "\033[0m"
}
~
一.启动kafka->(start-kafka)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/logpid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; thenlog_info "The kafka process does not exist, startting.........................................................................................."
elselog_warn "The kafka process exists and does not need to be started"exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log
二.停止kafka->(stop-kafka)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; thenlog_warn "The kafka process does not exist and does not need to be stopped"exit 1
elselog_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"
三.创建topic->(create-topic)
下面代码中的路径你要替换成自己的路径
#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; thenlog_err "错误:请传入topic名称"exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$TOPIC_NAME$";thenlog_warn "$TOPIC_NAME 已经存在,放弃创建"
else# 默认1副本,3分区kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAMElog_info "请执行topic-list检查创建是否成功"
fi
~
四.删除topic->(delete-topic)
下面代码中的路径你要替换成自己的路径
#!/bin/bashsource /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0 # 返回true elselog_warn "$local_topic_name 不存在->false"return 1 # return falsefi
}# 逐个删除topic
for topic in "$@"
doif ! check_kafka_topic $topic; thenlog_info "tpoic->$topic 不存在,跳过删除行为"continueelselog_info "topic->$topic 执行删除"kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topiclog_info "topic->$topic 删除成功"fi
done
五.获取topic列表->(list-topic)
#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; thenlog_info "目标$1 详情如下"kafka-topics.sh --describe --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
elselog_info "所有topic 列表如下:"kafka-topics.sh --describe --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi
六. 将文件数据 录入到kafka->(file-to-kafka)
#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; thenlog_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"exit 1
fiif ! [ -f $1 ]; thenlog_err "$1不是一个有效的数据文件"exit 1
fiFILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092 #检查topic是否存在
function check_kafka_topic() {local local_KAFKA_BROKER=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER --list | grep -q "^$local_KAFKA_BROKER$";thenreturn 0 # 返回true elsereturn 1 # return falsefi
}#将文件数据推送到kafka
function send_to_kafka(){local local_path=$1local count=0while IFS= read -r line; do kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line" count=$((count+1))done < "$local_path"echo $count
} if ! check_kafka_topic $TOPIC_NAME;thenlog_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"exit 1
filog_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds)) log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."
七.将kafka数据 下载到文件->(kafka-to-file)
#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录
KAFKA_BIN_DIR=/path/to/kafka/bin#kafka 地址
KAFKA_SERVER=ip:9092 # Kafka的配置文件目录
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config# Kafka消费者配置文件
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties# 指定要消费的主题
TOPIC_NAME=your_topic_name# 指定要写入的文件
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3log_info "执行检察............................................................................................................................"function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_SERVER --list | grep -q "^$local_topic_name$";thenreturn 0 # 返回true elsereturn 1 # return falsefi
}if ! check_kafka_topic $TOPIC_NAME;thenlog_err "topic->$TOPIC_NAME 未找到"exit 1
fi
log_info "检查通过............................................................................................................................"log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; doif [[ $line == *"PARTITION"* ]]; thencontent="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"echo -e "\033[45m" ${content} "\033[0m"else log_info "$line"fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件
if [ $# -eq 2 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi
八. 查看topic的groupID消费情况->(list-group)
#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $kafka_broker --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0 # 返回true elselog_warn "$local_topic_name 不存在->false"return 1 # return falsefi
}if [ $# -eq 1 ]; thenif ! check_kafka_topic $1; then#topic 不存在则直接退出程序log_warn "topic=$1, 不存在"exit 1filog_info "topic_name=$1 的gruoupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
elselog_info "所有groupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi
相关文章:

【shell-10】shell实现的各种kafka脚本
kafka-shell工具 背景日志 log一.启动kafka->(start-kafka)二.停止kafka->(stop-kafka)三.创建topic->(create-topic)四.删除topic->(delete-topic)五.获取topic列表->(list-topic)六. 将文件数据 录入到kafka->(file-to-kafka)七.将kafka数据 下载到文件-&g…...
【模型压缩】模型剪枝详解
参考链接:https://zhuanlan.zhihu.com/p/635454943 https 文章目录 1. 前言1.1 为什么要进行模型剪枝1.2 为什么可以进行模型剪枝2. 剪枝方式的几种分类2.1 结构化剪枝 和 非结构化剪枝2.1.1 结构化剪枝2.1.2 非结构化剪枝2.2 静态剪枝与动态剪枝2.2.1 静态剪枝2.2.2 动态剪枝…...

Log4j2-01-log4j2 hello world 入门使用
拓展阅读 Log4j2 系统学习 Logback 系统学习 Slf4j Slf4j-02-slf4j 与 logback 整合 SLF4j MDC-日志添加唯一标识 分布式链路追踪-05-mdc 等信息如何跨线程? Log4j2 与 logback 的实现方式 日志开源组件(一)java 注解结合 spring aop 实现自动输…...

Mysql-日志介绍 日志配置
环境部署 docker run -d -p 3306:3306 --privilegedtrue -v $(pwd)/logs:/var/lib/logs -v $(pwd)/conf:/etc/mysql/conf.d -v $(pwd)/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD654321 --name mysql mysql:5.7运行指令的目录下新建好这些文件: 日志类型 日…...

计算机网络的体系结构的各层在整个过程中起到什么作用?
ps:本文章的图片内容来源都是来自于湖科大教书匠的视频,声明:仅供自己复习,里面加上了自己的理解 这里附上视频链接地址:1.6 计算机网络体系结构(4)—专用术语_哔哩哔哩_bilibili 目录 &#x…...

如何在业务代码中优雅的使用策略模式?
策略模式介绍 假设你正在开发一个电商平台,其中涉及到商品的折扣策略。优惠策略有很多种可能,如领取优惠券抵扣、返现促销、拼团优惠等。最初的实现可能会在购物车类中嵌入各种折扣逻辑,导致代码的可维护性和扩展性下降。 下面代码在业务开…...
“docker-credential-desktop.exe“: executable file not found in $PATH 错误解决
"docker-credential-desktop.exe": executable file not found in $PATH 错误解决 1. 错误信息和解决方法 1. 错误信息和解决方法 错误信息, error getting credentials - err: exec: "docker-credential-desktop.exe": executable file not …...
openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs
文章目录 openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs概述笔记END openssl3.2/test/certs - 055 - all DNS-like CNs allowed by CA1, no DNS SANs 概述 openssl3.2 - 官方demo学习 - test - certs 笔记 /*! * \file D:\my_dev\my_local_…...
长虹智能电视6000iD、6080iD、3000iD、U2系列等 ZLM61HiPJ机芯升级刷机方法,附刷机数据
机芯:ZLM61HiPJ 适用机型:UD39B6000iD、UD42B6000iD、UD50B6000iD、UD55B6000iD、UD42C6000iD、UD42C6080iD、UD49C6000iD、UD49C6080iD、UD55C6000iD、UD55C6080iD、UD50C6000iD、UD58C3000iD、42U2 LE42C19S-UD、LE50C29SD-UD、 UD49C6000iD(LJM2W)、…...

六、VTK创建平面vtkPlaneSource
vtkPlaneSource创建位于平面中的四边形数组 先看看效果图: vtkPlaneSource 创建一个 m x n 个四边形数组,这些四边形在平面中排列为规则平铺。通过指定一个原点来定义平面,然后指定另外两个点,这两个点与原点一起定义平面的两个轴。这些轴不必是正交的 - 因此您可以创建平行…...

LiveGBS流媒体平台GB/T28181常见问题-如何配置使用自己已有的redis服务替换redis版本升级redis版本
LiveGBS如何配置使用自己已有的redis服务替换redis版本升级redis版本 1、Redis服务2、如何切换REDIS?2.1、停止启动REDIS2.2、配置信令服务2.3、配置流媒体服务2.4、启动 3、搭建GB28181视频直播平台 1、Redis服务 在LivGBS中Redis作为数据交换、数据订阅、数据发布的高速缓存…...
stm32产品架构
文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据 总结 前言 起因是我在看野火的ucosiii,然后他是基于i.mx芯片。然后我就很疑惑i.mx是什么芯片,看了下好像是ARM-M7(或者叫ARMCM7)架构的芯片。然后我又疑惑ARM-M7又是什么架…...

数据结构——双链表
双链表中节点类型的描述: 双链表的初始化(带头结点) 、 双链表的插入操作 后插操作 InsertNextDNode(p, s): 在p结点后插入s结点 按位序插入操作: 思路:从头结点开始,找到某个位序的前驱结点ÿ…...
Git 对文件名大小写不敏感的问题解决方案
目录 一、Git 对文件名大小写不敏感1.1 问题描述1.2 原因分析1.3 解决方案方式一:使用git命令进行修改方式二:关闭git 忽略大小写配置 (可以当前项目设置,也可以全局设置 --global) 二、新的问题(重复的目录…...

Java复习系列之阶段三:框架原理
1. Spring 1.1 核心功能 1. IOC容器 IOC,全称为控制反转(Inversion of Control),是一种软件设计原则,用于减少计算机代码之间的耦合度。控制反转的核心思想是将传统程序中对象的创建和绑定由程序代码直接控制转移到…...

【Python】01快速上手爬虫案例一:搞定豆瓣读书
文章目录 前言一、VSCodePython环境搭建二、爬虫案例一1、爬取第一页数据2、爬取所有页数据3、格式化html数据4、导出excel文件 前言 实战是最好的老师,直接案例操作,快速上手。 案例一,爬取数据,最终效果图: 一、VS…...

JavaEE 网络编程
JavaEE 网络编程 文章目录 JavaEE 网络编程引子1. 网络编程-相关概念1.1 基本概念1.2 发送端和接收端1.3 请求和响应1.4 客户端和服务端 2. Socket 套接字2.1 数据包套接字通信模型2.2 流套接字通信模型2.3 Socket编程注意事项 3. UDP数据报套接字编程3.1 DatagramSocket3.2 Da…...
5.rk3588用cv读取图片(C++)
rk3588自带了cv,不需要重新安装,执行以下操作即可: 一、读取图片 1.读取某张图片 #define HAVE_OPENCV_VIDEO #define HAVE_OPENCV_VIDEOIO#include <opencv2/opencv.hpp> #include <iostream> #include <opencv2/opencv.h…...

Github 无法正常访问?一招解决
查询IP网址: https://ip.chinaz.com/ 主页如下: 分别查询以下三个网址的IP: github.com github.global.ssl.fastly.net assets-cdn.github.com 修改 hosts 文件: 将 /etc/hosts 复制到 home 下 sudo cp /etc/hosts ./ gedit hosts 在底下…...

架构师的36项修炼-08系统的安全架构设计
本课时讲解系统的安全架构。 本节课主要讲 Web 的攻击与防护、信息的加解密与反垃圾。其中 Web 攻击方式包括 XSS 跨站点脚本攻击、SQL 注入攻击和 CSRF 跨站点请求伪造攻击;防护手段主要有消毒过滤、SQL 参数绑定、验证码和防火墙;加密手段,…...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
DAY 26 函数专题1
函数定义与参数知识点回顾:1. 函数的定义2. 变量作用域:局部变量和全局变量3. 函数的参数类型:位置参数、默认参数、不定参数4. 传递参数的手段:关键词参数5 题目1:计算圆的面积 任务: 编写一…...