当前位置: 首页 > news >正文

三、Kafka生产者

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据 atguigu  helloString msgValues = value.toString();int partition;if (msgValues.contains("atguigu")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");// 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 1 创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}// 3 关闭资源kafkaProducer.close();}
}

3.6 数据可靠性

相关文章:

三、Kafka生产者

目录 3.1 生产者消息发送流程3.1.1 发送原理 3.2 异步发送 API3.3 同步发送数据3.4 生产者分区3.4.1 kafka分区的好处3.4.2 生产者发送消息的分区策略3.4.3 自定义分区器 3.5 生产者如何提高吞吐量3.6 数据可靠性 3.1 生产者消息发送流程 3.1.1 发送原理 3.2 异步发送 API 3…...

【SA8295P 源码分析】19 - QNX Host NFS 文件系统配置

【SA8295P 源码分析】19 - QNX Host NFS 文件系统配置 一、NFS Server二、NFS Client三、NFS 相关的文件及目录四、将文件放入QNX 文件系统中五、编译下载验证系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】19 - QNX Host N…...

JRE、JDK、JVM及JIT之间有什么不同?_java基础知识总结

当涉及Java编程和执行时&#xff0c;以下术语具有不同的含义&#xff1a; 1.JRE (Java Runtime Environment) JRE是Java运行时环境的缩写。它是一个包含用于在计算机上运行Java应用程序所需的组件集合。JRE包括了以下几个主要部分&#xff1a; Java虚拟机(JVM)&#xff1a;用…...

sqlite3数据库的实现

sqlite3代码实现数据库的插入、删除、修改、退出功能 #include <head.h> #include <sqlite3.h> #include <unistd.h> int do_insert(sqlite3 *db); int do_delete(sqlite3 *db); int do_update(sqlite3 *db);int main(int argc, const char *argv[]) {sqlit…...

c#设计模式-结构型模式 之 桥接模式

前言 桥接模式是一种设计模式&#xff0c;它将抽象与实现分离&#xff0c;使它们可以独立变化。这种模式涉及到一个接口作为桥梁&#xff0c;使实体类的功能独立于接口实现类。这两种类型的类可以结构化改变而互不影响。 桥接模式的主要目的是通过将实现和抽象分离&#xff0c;…...

【Vue-Router】导航守卫

前置守卫 main.ts import { createApp } from vue import App from ./App.vue import {router} from ./router // import 引入 import ElementPlus from element-plus import element-plus/dist/index.css const app createApp(App) app.use(router) // use 注入 ElementPlu…...

07无监督学习——降维

1.降维的概述 维数灾难(Curse of Dimensionality):通常是指在涉及到向量的计算的问题中&#xff0c;随着维数的增加&#xff0c;计算量呈指数倍增长的一种现象。 1.1什么是降维&#xff1f; 1.降维(Dimensionality Reduction)是将训练数据中的样本(实例)从高维空间转换到低维…...

系列七、IOC操作bean管理(xml自动装配)

一、概述 自动装配是根据指定规则&#xff08;属性名称或者属性类型&#xff09;&#xff0c;Spring自动将匹配的属性值进行注入。 二、分类 xml自动装配分为按照属性名称自动装配&#xff08;byName&#xff09;和按照属性类型自动装配&#xff08;byType&#xff09;。 2.1…...

01- vdom 和模板编译源码

组件渲染的过程 template --> ast --> render --> vDom --> 真实的Dom --> 页面 Runtime-Compiler和Runtime-Only的区别 - 简书 编译步骤 模板编译是Vue中比较核心的一部分。关于 Vue 编译原理这块的整体逻辑主要分三个部分&#xff0c;也可以说是分三步&am…...

C++入门知识点——解决C语言不足

&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️ &#x1f4a5;个人主页&#xff1a;&#x1f525;&#x1f525;&#x1f525;大魔王&#x1f525;&#x1f525;&#x1f525; &#x1f4a5;代码仓库&#xff1a;&#x1f525;&#x1f525;魔…...

探秘分布式大数据:融合专业洞见,燃起趣味火花,启迪玄幻思维

文章目录 一 数据导论二 大数据的诞生三 大数据概论3.1 大数据的5V特征3.2 大数据的工作核心 四 大数据软件生态4.1 数据存储软件4.2 数据计算软件4.3 数据传输软件 五 Apache Hadoop概述5.1 Apache Hadoop框架5.2 Hadoop的功能5.3 Hadoop的发展5.4 Hadoop发行版本 一 数据导论…...

什么是 SPI,和API有什么区别?

面试回答 Java 中区分 API 和 SPI&#xff0c;通俗的讲&#xff1a;API 和 SPI 都是相对的概念&#xff0c;他们的差别只在语义上&#xff0c;API 直接被应用开发人员使用&#xff0c;SPI 被框架扩展人员使用。 API Application Programming Interface 大多数情况下&#xff…...

python3 安装clickhouse_sqlalchemy(greenlet) 失败

环境信息&#xff1a; centos7操作系统&#xff0c;python3.8 执行pip3 install clickhouse_sqlalchemy或者pip3 install greenlet报以下报错&#xff1a; Command "/opt/python3.6.10-customized/bin/python3.6 -u -c "import setuptools, tokenize;file/tmp/pip-in…...

五款拿来就能用的炫酷表白代码

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;小白零基础《Python入门到精通》 五款炫酷表白代码 1、无限弹窗表白2、做我女朋友好吗&#xff0c;不同意就关机3、…...

Springboot 封装整活 Mybatis 动态查询条件SQL自动组装拼接

前言 ps&#xff1a;最近在参与3100保卫战&#xff0c;战况很激烈&#xff0c;刚刚打完仗&#xff0c;来更新一下之前写了一半的博客。 该篇针对日常写查询的时候&#xff0c;那些动态条件sql 做个简单的封装&#xff0c;自动生成&#xff08;抛砖引玉&#xff0c;搞个小玩具&a…...

宝塔部署Java+Vue前后端分离项目经验总结

前言 之前部署服务器都是在Linux环境下自己一点一点安装软件&#xff0c;听说用宝塔傻瓜式部署更快&#xff0c;这次浅浅尝试了一把。 确实简单&#xff01; 1、 买服务器 咋买服务器略&#xff0c;记得服务器装系统就装 Cent OS 7系列即可&#xff0c;我装的7.6。 2、创建…...

【公告】停止更新

CSDN 博客的限制太多了。阅读体验也非常差。后续将不再 CSDN 上更新。 逐步迁移到掘金和个人博客。 欢迎关注 掘金&#xff1a;0xforee 个人博客&#xff1a;0xforee’s blog...

AutoHotKey+VSCode开发扩展推荐

原来一直用的大众推荐的SciTeAHK版&#xff0c;最近发现VSCode更舒服一些&#xff0c;有几个必装的扩展推荐一下&#xff1a; AutoHotkey Plus 请注意不是AutoHotkey Plus Plus。如果在扩展商店里搜索会有两个&#xff0c;一个是Plus&#xff0c;一个是Plus Plus。我选择Pllus&…...

了解 JSON 格式

一、JSON 基础 JSON&#xff08;JavaScript Object Notation&#xff0c;JavaScript 对象表示法&#xff09;是一种轻量级的数据交换格式&#xff0c;JSON 的设计目的是使得数据的存储和交换变得简单。 JSON 易于人的阅读和书写&#xff0c;同时也易于机器的解析和生成。尽管 J…...

[RDMA] 高性能异步的消息传递和RPC :Accelio

1. Introduce Accelio是一个高性能异步的可靠消息传递和RPC库&#xff0c;能优化硬件加速。 RDMA和TCP / IP传输被实现&#xff0c;并且其他的传输也能被实现&#xff0c;如共享存储器可以利用这个高效和方便的API的优点。Accelio 是 Mellanox 公司的RDMA中间件&#xff0c;用…...

typescript报错:‘name‘ was also declared here

问题再现 用 Typescript 时&#xff0c; 遇到一个声明常量 name 的报错。代码如下&#xff1a; let name:string"zhangsan"; let num:number1001;执行编译时报错&#xff1a; 原因 在默认状态下&#xff0c;typescript 将 DOM typings 作为全局的运行环境&#…...

第十章:联邦学习视觉案例

代码 传送门...

c语言——输出一个整数的所有因数

//输出一个整数的所有因数 #include<stdio.h> #include<stdlib.h> int main() {int number,i;printf("输入整数&#xff1a;");scanf("%d",&number);printf(" %d 的因数有&#xff1a; ",number);for(i1;i<number;i){if(numb…...

mqtt学习记录

目录 1 匿名登录2 ⽤户名密码登录&#xff0c;配置接收的主题mosquitto 配置文件修改添加⽤户信息添加topic和⽤户的关系登录演示 3 遗嘱机制 1 匿名登录 ⾸先打开三个终端&#xff0c; 启动代理服务&#xff1a;mosquitto -v -v 详细模式 打印调试信息 默认占⽤&#xff1a;…...

爬虫逆向实战(十八)--某得科技登录

一、数据接口分析 主页地址&#xff1a;某得科技 1、抓包 通过抓包可以发现数据接口是AjaxLogin 2、判断是否有加密参数 请求参数是否加密&#xff1f; 查看“载荷”模块可以发现有一个password加密参数和一个__RequestVerificationToken 请求头是否加密&#xff1f; 无…...

Java-数组

什么是数组 数组&#xff1a;可以看成是相同类型元素的一个集合。在内存中是一段连续的空间。 在java中&#xff0c; 数组中存放的元素其类型相同数组的空间是连在一起的每个空间有自己的编号&#xff0c;起始位置的编号为0&#xff0c;即数组的下标。 数组的创建及初始化 数…...

Dart 入门Hello world

1、下载Dart sdk IntelliJ & Android Studio | Dart 2、安装Dart 插件 3、安装后重启IDEA&#xff0c;创建Dart项目 4、创建dart文件 5、编写函数&#xff1a; void main() {print("Hello world"); } 6、运行&#xff1a; 官网学习&#xff1a;Dart 语言开发文…...

HTML是什么?

HTML是什么&#xff1f; 超文本标记语言&#xff08;英语&#xff1a;HyperText Markup Language&#xff0c;简称&#xff1a;HTML&#xff09;是一种用于创建网页的标准标记语言。 您可以使用 HTML 来建立自己的 WEB 站点&#xff0c;HTML 运行在浏览器上&#xff0c;由浏览器…...

【UniApp开发小程序】商品详情展示+评论、评论展示、评论点赞+商品收藏【后端基于若依管理系统开发】

文章目录 界面效果界面实现工具js页面日期格式化 后端收藏ControllerServicemapper 评论ControllerServiceMapper 商品Controller 阅读Service 界面效果 【说明】 界面中商品的图片来源于闲鱼&#xff0c;若侵权请联系删除 【商品详情】 【评论】 界面实现 工具js 该工…...

rabbitMq安装后无法启动可视化页面http://localhost:15672处理

本次安装环境信息&#xff1a; 系统&#xff1a;win10 64位专业版 erlang&#xff1a;otp_win64_23.0 rabbitMQ&#xff1a;rabbitmq-server-3.8.5 安装rabbitMQ需要依赖erlang语言环境&#xff0c;所以需要我们下载erlang的环境安装程序。 一、下载安装程序 rabbitMQ安装…...

怎样把有用网站做图标放在桌面/软文推广营销平台

2019独角兽企业重金招聘Python工程师标准>>> 关于区块链是什么的话题&#xff0c;估计现在已经烂大街了。但是实际上&#xff0c; 那些我们认为已经非常普通的概念&#xff0c;却往往别有洞天。 1.从社会学角度上来讲&#xff1a; 区块链的概念来自凯文.凯利《失控》…...

郑州做网站的公司msgg/如何制作一个自己的网页网站

我正在使用Apache Commons FTPClient上传大文件&#xff0c;但是传输速度只是使用WinSCP通过FTP传输速度的一小部分。如何加快转移速度&#xff1f;public boolean upload(String host, String user, String password, String directory,String sourcePath, String filename) t…...

小程序游戏制作/东莞百度seo排名

作者&#xff5c;yawn Lauhttp://jvm123.com/2019/08/springboot-activiti.html依赖&#xff1a;新建springBoot项目时勾选activiti&#xff0c;或者在已建立的springBoot项目添加以下依赖&#xff1a;org.activitiactiviti-spring-boot-starter-basic6.0.0配置&#xff1a;数据…...

网站开发程序/seo的优点有哪些

方法&#xff1a; 菜单栏内容找到并替换/删除即可 logo也可以替换/删除...

为什么政府的网站总是做的很差/电商平台运营方案思路

时间限制&#xff1a;1 秒 内存限制&#xff1a;32 兆 特殊判题&#xff1a;否 提交&#xff1a;6729 解决&#xff1a;1981 题目描述&#xff1a;读入一组字符串&#xff08;待操作的&#xff09;&#xff0c;再读入一个int n记录记下来有几条命令&#xff0c;总共有2中命令&a…...

下载网站后台/怎么网站推广

1、大部分SDK的方法需要在线程中执行&#xff0c;一般会放在主线程里执行&#xff0c;安卓中主线程一般用于UI渲染。 1 this.runOnUiThread(new Runnable() { 2 3 Override 4 public void run() { 5 // TODO Auto-generated method …...