当前位置: 首页 > news >正文

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入

DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config = self.read_config(config_file)def read_config(self, config_file):config = {}with open(config_file) as fh:for line in fh:line = line.strip()if len(line) != 0 and line[0] != "#":parameter, value = line.strip().split('=', 1)config[parameter] = value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer = Producer(self.config)# Produces a sample messageproducer.produce(topic, key=key, value=value)print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):# Sets the consumer group ID and offsetself.config["group.id"] = group_idself.config["auto.offset.reset"] = auto_offset_resetconsumer = Consumer(self.config)consumer.subscribe([topic])loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callback=None):thread = threading.Thread(target=self.consume_async, args=(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg = consumer.poll(1.0)if msg is not None:breakif not msg.error():key = msg.key().decode("utf-8")value = msg.value().decode("utf-8")print(f"Consumed message: key = {key:12} value = {value:12}")except KeyboardInterrupt:passfinally:consumer.close()config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
# Best practice for Kafka producer to prevent data loss
acks=all

java(kotiln)


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClient<T, V> : Closeable {private var producer: KafkaProducer<T, V>? = nullprivate var fileConfig: Properties? = nullval TOPIC = "topic"private val DURATION = 100Lprivate val POOLSIZE = 10private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")private val SCOPE = CoroutineScope(DISPATCHER)constructor(configPath: String? = null, config: Properties? = null) {if (config == null && configPath == null) throw Exception("don't have any config")var config1 = Properties()if (configPath != null) {fileConfig = readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config != null) {config1.putAll(config)}producer = KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? = null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}@Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException("$configFile not found.")}val config = Properties()FileInputStream(configFile).use { inputStream -> config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli =KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")cli.consume {println("test beg")for (record in it) {println(String.format("Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()))}println("test end")}// Give some time for the consumer to startThread.sleep(2000)cli.produce("key1", "test")// Give some time for the consumer to consume the messageThread.sleep(5000)
}

相关文章:

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地&#xff0c;命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写&#xff0c;java改成…...

vivado DIAGRAM、HW_AXI

图表 描述 块设计&#xff08;.bd&#xff09;是在IP中创建的互连IP核的复杂系统 Vivado设计套件的集成商。Vivado IP集成器可让您创建复杂的 通过实例化和互连Vivado IP目录中的IP进行系统设计。一块 设计是一种分层设计&#xff0c;可以写入磁盘上的文件&#xff08;.bd&…...

学习分享-为什么把后台的用户验证和认证逻辑放到网关

将后台的用户验证和认证逻辑放到网关&#xff08;API Gateway&#xff09;中是一种常见的设计模式&#xff0c;这种做法在微服务架构和现代应用中有许多优势和理由&#xff1a; 1. 集中管理认证和授权 统一的安全策略 在一个包含多个微服务的系统中&#xff0c;如果每个服务…...

27 ssh+scp+nfs+yum进阶

ssh远程管理 ssh是一种安全通道协议&#xff0c;用来实现字符界面的远程登录。远程复制&#xff0c;远程文本传输。 ssh对通信双方的数据进行了加密。 用户名和密码登录 密钥对认证方式&#xff08;可以实现免密登录&#xff09; ssh 22 网络层 传输层 数据传输的过程中是…...

LabVIEW液压伺服压力机控制系统与控制频率选择

液压伺服压力机的控制频率是一个重要的参数&#xff0c;它直接影响系统的响应速度、稳定性和控制精度。具体选择的控制频率取决于多种因素&#xff0c;包括系统的动态特性、控制目标、硬件性能以及应用场景。以下是一些常见的指导原则和考量因素&#xff1a; 常见的控制频率范…...

阿里云(域名解析) certbot 证书配置

1、安装 certbot ubuntu 系统&#xff1a; sudo apt install certbot 2、申请certbot 域名证书&#xff0c;如申请二级域名aa.example.com 的ssl证书&#xff0c;同时需要让 bb.aa.example.com 也可以使用此证书 1、命令&#xff1a;sudo certbot certonly -d “域名” -d “…...

Web LLM 攻击技术

概述 在ChatGPT问世以来&#xff0c;我也尝试挖掘过ChatGPT的漏洞&#xff0c;不过仅仅发现过一些小问题&#xff1a;无法显示xml的bug和错误信息泄露&#xff0c;虽然也挖到过一些开源LLM的漏洞&#xff0c;比如前段时间发现的Jan的漏洞&#xff0c;但是不得不说传统漏洞越来…...

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier) Async如何使用 使用Async标注在方法上&#xff0c;可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。 1.启动类添加EnableAsync注解 2. 方法上添加A…...

秒杀优化+秒杀安全

1.Redis预减库存 1.OrderServiceImpl.java 问题分析 2.具体实现 SeckillController.java 1.实现InitializingBean接口的afterPropertiesSet方法&#xff0c;在bean初始化之后将库存信息加载到Redis /*** 系统初始化&#xff0c;将秒杀商品库存加载到redis中** throws Excepti…...

48、Flink 的 Data Source API 详解

a&#xff09;概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b&#xff09;Source Source API 是一个工厂模式的接口&#xff0c;用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外&#xff0c;Sou…...

深入解析Java扩展机制:SPI与Spring.factories

目录 Java SPI概述 1.1 什么是SPI&#xff1f;1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories&#xff1f;3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...

Vue2之模板语法

文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 &#xff08;1&#xff09;在data中声明的 &#xff08;2&#xff09;常量 &#xff08;3&#xff09;合法的JavaScript…...

java基础练习题

1、一个".java"源文件中是否可以包括多个类&#xff1f;有什么限制&#xff1f; 可以包含多个类。但是只有一个类可以声明为public&#xff0c;且要求声明为public的类的类名与源文件名相同。 2、java的优势&#xff1f; a、跨平台性 b、安全性高 c、简单性 d、…...

unity中通过实现底层接口实现非按钮(图片)的事件监听

编写监听脚本 PEListenter 继承自MonoBehaviour类&#xff0c;并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口&#xff0c;按照需求定义需要接收事件&#xff08;鼠标按下、抬起、拖拽&#xff09;的回调函数 //监听类&#xff08;需要挂载在物体上面&am…...

重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?

在当今电子商务的浪潮中&#xff0c;选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目&#xff0c;在行业内引起了广泛关注。对于想要加盟该项目的人来说&#xff0c;项目的靠谱性无疑是首要考虑的问题。 首先&#xff0c;我们来看看耶非凡科技有限公司的背…...

《青少年编程与数学》课程方案:4、课程策略

《青少年编程与数学》课程方案&#xff1a;4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维&#xff0c;避免重复造轮子&#xff0c;培养使命感&#xff0c;通过探索兴趣、…...

用爬虫实现---模拟填志愿

先来说实现逻辑&#xff0c;首先我要获取到这个网站上所有的信息&#xff0c;那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性&#xff0c;并且是相同的&#xff0c;那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析&#xff0…...

vscode Run Code输出出现中文乱码情况问题解决方案

主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...

代码随想录训练营Day30

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 今天是跟着代码随想录刷题的第30天&#xff0c;主要是复习了回溯算法…...

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)

概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...

STM32F103C8T6基于HAL库移植uC/OS-III

文章目录 一、建立STM32CubeMX工程二、移植1、 uC/OS-III源码2、移植过程 三、配置相关代码1、bsp.c和bsp.h2、main.c3、修改启动代码4、修改app_cfg.h文件5、修改includes.h文件6、修改lib_cfg.h文件 四、编译与烧录总结参考资料 学习嵌入式实时操作系统&#xff08;RTOS&…...

微服务学习Day9-分布式事务Seata

文章目录 分布式事务seata引入理论基础CAP定理BASE理论 初识Seata动手实践XA模式AT模式TCC模式SAGA模式 高可用 分布式事务seata 引入 理论基础 CAP定理 BASE理论 初识Seata 动手实践 XA模式 AT模式 TCC模式 Service Slf4j public class AccountTCCServiceImpl implements A…...

vue用vite配置代理解决跨域问题(target、rewrite和changeOrigin的使用场景)

Vite的target、rewrite和changeOrigin的使用场景 1. target 使用场景&#xff1a;target 属性在 Vite 的 vite.config.ts 或 vite.config.js 文件的 server.proxy 配置中指定&#xff0c;用于设置代理服务器应该将请求转发到的目标地址。这通常是一个后端服务的API接口地址。…...

为什么PPT录制没有声音 电脑ppt录屏没有声音怎么办

一、为什么PPT录制没有声音 1.软件问题 我们下载软件的时候可能遇到软件损坏的问题&#xff0c;导致录制没有声音&#xff0c;但其他功能还是可以使用的。我建议使用PPT的隐藏功能&#xff0c;下载插件&#xff0c;在PPT界面的加载项选项卡中就能使用。我推荐一款可以解决录屏…...

JDBC学习笔记(三)高级篇

一、JDBC 优化及工具类封装 1.1 现有问题 1.2 JDBC 工具类封装 V1.0 resources/db.properties配置文件&#xff1a; driverClassNamecom.mysql.cj.jdbc.Driver urljdbc:mysql:///atguigu usernameroot password123456 initialSize10 maxActive20 工具类代码&#xff1a; p…...

c++编译器在什么情况下会提供类的默认构造函数等,与析构函数

我们都知道&#xff0c;在 c 里&#xff0c;编写的简单类&#xff0c;若没有自己编写构造析构函数与 copy 构造函数 与 赋值运算符函数&#xff0c;那么编译器会提供这些函数&#xff0c;并实现简单的语义&#xff0c;比如成员赋值。看 源码时&#xff0c;出现了下图类似的情形…...

SpringBoot3整合Mybatis-Plus3.5.5出现的问题

主要是由于 mybatis-plus 中 mybatis 的整合包版本不够导致的 排除 mybatis-plus 中自带的 mybatis 整合包&#xff0c;单独引入即可 java.lang.IllegalArgumentException: Invalid value type for attribute factoryBeanObjectType: java.lang.Stringat org.springframework.…...

服务器数据恢复—强制上线raid5阵列离线硬盘导致raid不可用的数据恢复案例

服务器数据恢复环境&#xff1a; 某品牌2850服务器中有一组由6块SCSI硬盘组建的raid5磁盘阵列&#xff0c;linux操作系统ext3文件系统。 服务器故障&#xff1a; 服务器运行过程中突然瘫痪。服务器管理员检查阵列后发现raid5阵列中有两块硬盘离线&#xff0c;将其中一块硬盘进行…...

初入阿里云,上手走一波

初入阿里云&#xff0c;上手走一波 一阶&#xff1a;ECSMysqlDMS安装Mysql初始化MysqlMysql操作DMS管理Mysql 二阶&#xff1a;ECSOSS远程连接ECSOSS控制台其他图片服务 三阶&#xff1a;更多搭配操作 可以说个人在日常使用过程中&#xff0c;操作最多的阿里云产品就是阿里云服…...

[C++] 小游戏 斗破苍穹 2.2.1至2.11.5所有版本(中) zty出品

目录 2.8.2 2.9.1 2.10.1 2.10.2 2.10.3 2.10.4 2.10.5 2.8.2 #include<stdio.h> #include<iostream> #include<ctime> #include<bits/stdc.h> #include<time.h> //suiji #include<windows.h> //SLEEP函数 using namespace std; st…...

什么叫网站建设方案书/中国疫情最新数据

zookeeper是一个开源的分布式协调服务&#xff0c;是由雅虎创建的&#xff0c;基于google chubby。是一种分布式数据一致性的解决方案。一、zookeeper的特性顺序一致性 从同一个客户端发起的事务请求&#xff0c;最终会严格按照顺序被应用到zookeeper中。原子性 所有的事务请求…...

wordpress 去掉作者/百度托管运营哪家好

javascript06 \d 匹配数字 [4-9] 控制区间 [4567] 只能匹配出现数字的一次 X? 一次或者一次也没有 X* 零次或者多次 X 一次或者多次[即不能为空] //表示次数 X{n] 恰好n次 X{n,} 至少n次 x{n,m} 至少n次,最多m次 $ X 字符串必须以结尾 ^a 字符串必须以a打头 JS中判断用te…...

住房和城乡建设部网站事故快报/百度搜索引擎属于什么引擎

http://videolectures.net/Top/Computer_Science/Machine_Learning/##tvtt转载于:https://www.cnblogs.com/stoneresearch/archive/2010/08/22/4336500.html...

17网站一起做网店 发货慢/最新seo教程

2019独角兽企业重金招聘Python工程师标准>>> 过度使用同步会导致性能低下、死锁或其他不确定问题 在一个同步方法或代码块中&#xff0c;不要放弃对客户端的控制 即&#xff1a;在一个同步区域内部&#xff0c;不要调用被覆盖方法&#xff0c;或者是传入对象提供的方…...

唐山网站建设策划方案/小红书关键词热度查询

1.1 创建信号量int semget( key_t key, //标识信号量的关键字&#xff0c;有三种方法&#xff1a;1、使用IPC——PRIVATE让系统产生&#xff0c; // 2、挑选一个随机数&#xff0c;3、使用ftok从文件路径名中产生 int nSemes, //信号量集中元素个数 int flag …...

企业网站改版建议/广东seo推广

目标&#xff1a;提高结构化思维&#xff0c;书面表达&#xff0c;公文写作&#xff0c;口头(说话、演讲、讲课)表达能力 谁是我的听众&#xff1f;他们想听什么&#xff1f;他们想怎么听&#xff1f;金字塔原理的基本结构是&#xff1a;结论先行&#xff0c;以上统下&#xff…...