java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信
进入
DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config = self.read_config(config_file)def read_config(self, config_file):config = {}with open(config_file) as fh:for line in fh:line = line.strip()if len(line) != 0 and line[0] != "#":parameter, value = line.strip().split('=', 1)config[parameter] = value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer = Producer(self.config)# Produces a sample messageproducer.produce(topic, key=key, value=value)print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):# Sets the consumer group ID and offsetself.config["group.id"] = group_idself.config["auto.offset.reset"] = auto_offset_resetconsumer = Consumer(self.config)consumer.subscribe([topic])loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callback=None):thread = threading.Thread(target=self.consume_async, args=(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg = consumer.poll(1.0)if msg is not None:breakif not msg.error():key = msg.key().decode("utf-8")value = msg.value().decode("utf-8")print(f"Consumed message: key = {key:12} value = {value:12}")except KeyboardInterrupt:passfinally:consumer.close()config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Best practice for Kafka producer to prevent data loss
acks=all
java(kotiln)
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClient<T, V> : Closeable {private var producer: KafkaProducer<T, V>? = nullprivate var fileConfig: Properties? = nullval TOPIC = "topic"private val DURATION = 100Lprivate val POOLSIZE = 10private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")private val SCOPE = CoroutineScope(DISPATCHER)constructor(configPath: String? = null, config: Properties? = null) {if (config == null && configPath == null) throw Exception("don't have any config")var config1 = Properties()if (configPath != null) {fileConfig = readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config != null) {config1.putAll(config)}producer = KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? = null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}@Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException("$configFile not found.")}val config = Properties()FileInputStream(configFile).use { inputStream -> config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli =KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")cli.consume {println("test beg")for (record in it) {println(String.format("Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()))}println("test end")}// Give some time for the consumer to startThread.sleep(2000)cli.produce("key1", "test")// Give some time for the consumer to consume the messageThread.sleep(5000)
}相关文章:
java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信
进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地,命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写,java改成…...
vivado DIAGRAM、HW_AXI
图表 描述 块设计(.bd)是在IP中创建的互连IP核的复杂系统 Vivado设计套件的集成商。Vivado IP集成器可让您创建复杂的 通过实例化和互连Vivado IP目录中的IP进行系统设计。一块 设计是一种分层设计,可以写入磁盘上的文件(.bd&…...
学习分享-为什么把后台的用户验证和认证逻辑放到网关
将后台的用户验证和认证逻辑放到网关(API Gateway)中是一种常见的设计模式,这种做法在微服务架构和现代应用中有许多优势和理由: 1. 集中管理认证和授权 统一的安全策略 在一个包含多个微服务的系统中,如果每个服务…...
27 ssh+scp+nfs+yum进阶
ssh远程管理 ssh是一种安全通道协议,用来实现字符界面的远程登录。远程复制,远程文本传输。 ssh对通信双方的数据进行了加密。 用户名和密码登录 密钥对认证方式(可以实现免密登录) ssh 22 网络层 传输层 数据传输的过程中是…...
LabVIEW液压伺服压力机控制系统与控制频率选择
液压伺服压力机的控制频率是一个重要的参数,它直接影响系统的响应速度、稳定性和控制精度。具体选择的控制频率取决于多种因素,包括系统的动态特性、控制目标、硬件性能以及应用场景。以下是一些常见的指导原则和考量因素: 常见的控制频率范…...
阿里云(域名解析) certbot 证书配置
1、安装 certbot ubuntu 系统: sudo apt install certbot 2、申请certbot 域名证书,如申请二级域名aa.example.com 的ssl证书,同时需要让 bb.aa.example.com 也可以使用此证书 1、命令:sudo certbot certonly -d “域名” -d “…...
Web LLM 攻击技术
概述 在ChatGPT问世以来,我也尝试挖掘过ChatGPT的漏洞,不过仅仅发现过一些小问题:无法显示xml的bug和错误信息泄露,虽然也挖到过一些开源LLM的漏洞,比如前段时间发现的Jan的漏洞,但是不得不说传统漏洞越来…...
Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)
Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier) Async如何使用 使用Async标注在方法上,可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。 1.启动类添加EnableAsync注解 2. 方法上添加A…...
秒杀优化+秒杀安全
1.Redis预减库存 1.OrderServiceImpl.java 问题分析 2.具体实现 SeckillController.java 1.实现InitializingBean接口的afterPropertiesSet方法,在bean初始化之后将库存信息加载到Redis /*** 系统初始化,将秒杀商品库存加载到redis中** throws Excepti…...
48、Flink 的 Data Source API 详解
a)概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b)Source Source API 是一个工厂模式的接口,用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外,Sou…...
深入解析Java扩展机制:SPI与Spring.factories
目录 Java SPI概述 1.1 什么是SPI?1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories?3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...
Vue2之模板语法
文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 (1)在data中声明的 (2)常量 (3)合法的JavaScript…...
java基础练习题
1、一个".java"源文件中是否可以包括多个类?有什么限制? 可以包含多个类。但是只有一个类可以声明为public,且要求声明为public的类的类名与源文件名相同。 2、java的优势? a、跨平台性 b、安全性高 c、简单性 d、…...
unity中通过实现底层接口实现非按钮(图片)的事件监听
编写监听脚本 PEListenter 继承自MonoBehaviour类,并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口,按照需求定义需要接收事件(鼠标按下、抬起、拖拽)的回调函数 //监听类(需要挂载在物体上面&am…...
重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?
在当今电子商务的浪潮中,选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目,在行业内引起了广泛关注。对于想要加盟该项目的人来说,项目的靠谱性无疑是首要考虑的问题。 首先,我们来看看耶非凡科技有限公司的背…...
《青少年编程与数学》课程方案:4、课程策略
《青少年编程与数学》课程方案:4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维,避免重复造轮子,培养使命感,通过探索兴趣、…...
用爬虫实现---模拟填志愿
先来说实现逻辑,首先我要获取到这个网站上所有的信息,那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性,并且是相同的,那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析࿰…...
vscode Run Code输出出现中文乱码情况问题解决方案
主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...
代码随想录训练营Day30
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示:这里可以添加本文要记录的大概内容: 今天是跟着代码随想录刷题的第30天,主要是复习了回溯算法…...
Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)
概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...
【51单片机】4. 模块化编程与LCD1602Debug
1. 什么是模块化编程 传统编程会将所有函数放在main.c中,如果使用的模块多,一个文件内会有很多代码,不利于组织和管理 模块化编程则是将各个模块的代码放在不同的.c文件里,在.h文件里提供外部可调用函数声明,其他.c文…...
