kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义:
public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}
二、自定义序列化器和反序列化器
import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}
三、定义生产者和消费者
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}
相关文章:
kafka复习:(3)自定义序列化器和反序列化器
一、实体类定义: public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name name;}public String getAddress() {return address;}public void setAddress(String a…...
Unity 图片资源的适配
前言 最近小编做Unity项目时,发现在资源处理这方面和Android有所不同;例如:Android的资源文件夹res下会有着mipmap-mdpi,mipmap-hdpi,mipmap-xhdpi,mipmap-xxhdpi,mipmap-xxxhdpi这五个文件夹&a…...
【Axure高保真原型】通过输入框动态控制折线图
今天和大家分享通过输入框动态控制折线图的原型模板,在输入框里维护项目数据,可以自动生成对应的折线图,鼠标移入对应折点,可以查看对应数据。使用也非常方便,只需要修改输入框里的数据,或者复制粘贴文本&a…...
【Java】树结构数据的搜索
这里写自定义目录标题 需要实现的效果前端需要的json格式:一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式:一定是一个完整…...
ElementUI中的日历组件加载无效的问题
在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑,大家要注意下。 官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …...
Git版本管理(03)stash临时操作和.gitignore配置
1 git stash操作(临时存储) 1.1 git stash常见流程 当你修改了某一个分支,但此时要切换分支时如果直接切换会因为一些修改冲突而checkout失败,那么此时就可以使用git stash命令来解决该问题。一般流程为: $git pull# 将当前未提交的修改…...
【ThingJS | 3D可视化】开发框架,一站式数字孪生
博主:_LJaXi Or 東方幻想郷 专栏: 数字孪生 | 3D可视化框架 开发工具:ThingJS在线开发工具 ThingJs 低代码开发 ThingJs 低代码开发注意点场景效果配置层级层级常用API实例化 Thing,加载场景load 加载函数ThingJs 层级关系图查找层…...
SpringBoot返回响应排除为 null 的字段
SpringBoot返回响应排除为 null 的字段 可以通过全局配置,使返回响应中为null的字段,不在出现在返回结果中。 注意:这样配置,使得返回响应包含的字段随请求结果变化,响应到底包含哪些字段不直观;除非业务…...
华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)
第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案:D 解析: 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…...
OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能
🦉 AI新闻 🚀 OpenAI推出GPT-3.5Turbo微调功能并更新API,将提供GPT-4微调功能 摘要:OpenAI宣布推出GPT-3.5Turbo微调功能,并更新API,使企业和开发者能够定制ChatGPT,达到或超过GPT-4的能力。通…...
相机成像之3A算法的综述
3A算法是摄像机成像控制技术中的三大自动控制算法。随着计算机视觉的迅速发展,该算法在摄像器材领域具有广泛的应用和前景。 那么3A控制算法又是指什么呢? (1)AE (Auto Exposure)自动曝光控制 (2)AF (Auto Focus)自动聚焦控制 (3)AWB (Auto White Balance)自动白平衡控…...
最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库
一、前言 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧!…...
OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述
基于深度学习的运动目标检测(二)YOLOv2&YOLOv3概述 1.YOLOv2概述2.YOLOv3概述2.1 新的基础网络结构:2.2 采用多尺度预测机制。2.3 使用简单的逻辑回归进行分类 1.YOLOv2概述 对YOLO存在的不足,业界又推出了YOLOv2。YOLOv2主要…...
【Docker】已经创建好的Docker怎么设置开机自启
已经创建好的Docker怎么设置开机自启 1.使用命令Docker update来完成2.查看是否开启3.验证是否开启 1.使用命令Docker update来完成 操作步骤: docker update --restartalways 容器ID2.查看是否开启 docker inspect 容器Id看到这里RestartPolicy设置为如图&#…...
E - Excellent Views
Problem - E - Codeforces 问题描述:数组H大小都不相同。从i到j是可行的,当且仅当 不存在 k ,使 ∣ i − k ∣ ≤ ∣ i − j ∣ , H k > H j 不存在k,使 \\ |i - k| \leq |i - j|, \quad H_k > H_j 不存在k,使…...
WiFi天线和NB-IoT天线不通用
表面看起来完全一样。但是把WiFi天线插到NB-IoT设备后,信号弱了很多。还导致设备反复重启...
IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤
dc3 windows 本地搭建步骤: 必要软件环境 进入原网页# 务必保证至少需要给 docker 分配:1 核 CPU 以及 4G 以上的运行内存! JDK : 推荐使用 Oracle JDK 1.8 或者 OpenJDK8,理论来说其他版本也行; Maven : 推荐…...
VBA Excel自定义函数的使用 简单的语法
一个简单的教程,实现VBA自定义函数。 新建模块 复制后面的代码放进来 函数的入口参数不定义,则认为是一块区域; 反之,如FindChar1 As String,则认为是输入的单值。 循环和分支如下例子,VB比较接近自然语…...
字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP
走进后端开发流程 整个课程会带大家先从理论出发,思考为什么有流程 大家以后工作的团队可能不一样,那么不同的团队也会有不同的流程,这背后的逻辑是什么 然后会带大家按照走一遍从需求到上线的全流程,告诉大家在流程的每个阶段&am…...
线性代数-矩阵的本质
线性代数-矩阵的本质 线性代数-矩阵的本质...
React基础入门之虚拟Dom
React官方文档:https://react.docschina.org/ 说明 重要提示:本系列文章基础篇总结自尚硅谷课程,且采用类式写法!!最新的函数式组件写法见高级篇。 本系列文档旨在帮助vue同学更快速的学习react,如果你很…...
C++基础Ⅰ编译、链接
目录儿 1 C是如何工作的1.1 预处理语句1.2 include1.3 main()1.4 编译单独编译项目编译 1.5 链接 2 定义和调用函数3 编译器如何工作3.1 编译3.1.1 引入头文件系统头文件自定义头文件 3.1.2 自定义类型3.1.3 条件判断拓展: 汇编 3.2 链接3.2.1 起始函数3.2.2 被调用的函数 3.3 …...
VMware和ubuntu配置Hadoop环境
目录 一、获取VMware安装包 1、官网获取 1)首先先进入官网,官网首页是下面这样: 2)接着点击产品选项 3)进入后点击查看所有产品,然后在右上角选择排序方式为Z到A,然后向下滑动找到Workstation…...
uview2.0自定义tabbar
tabbar组件 <template><u-tabbar :value"tab" change"changeTab" :fixed"true" :border"true" :placeholder"true":safeAreaInsetBottom"true"><u-tabbar-item text"消息" icon"c…...
Star History 月度开源精选|Llama 2 及周边生态特辑
7 月 18 日,Meta 发布了 Llama,大语言模型 Llama 1 的进阶版,可以自由免费用于研究和商业,支持私有化部署。 所以本期 Star History 的主题是:帮助你快速把 Llama 2 在自己机器上跑起来的开源工具,无论你的…...
修改电脑上路由表使笔记本默认走无线
如果笔记本上即连接了有线,也连接了无线,默认电脑会走有线的,通过route print命令查看路由表就可以看出来,因为无线的“metric”跳数要比有线的高 解决方法: 如果想实现让默认走无线,就需要修改自己电脑的…...
Spring Cache的介绍以及怎么使用(redis)
Spring Cache 文章目录 Spring Cache1、Spring Cache介绍2、Spring Cache常用注解2.1、EnableCaching注解2.2、CachePut注解2.3、CacheEvict注解2.4、Cacheable注解 3、Spring Cache使用方式--redis 1、Spring Cache介绍 Spring Cache是一个框架,实现了基于注解的缓…...
软考高级系统架构设计师系列论文六十九:论信息系统的安全风险评估
一、信息系统相关知识点 软考高级信息系统项目管理师系列之四十三:信息系统安全管理软考高级系统架构设计师:系统安全分析与设计...
Ubuntu系统安装之后首需要做的事情
Ubuntu系统的初步环境搭建 1、换源2、显卡3、浏览器4、输入法5、终端6、ROS7、VSCode8、设置时间与win一致9、 TimeShift10、 Anaconda(考虑装不装) 1、换源 点开Software&&Update,找到Ubuntu Software中的Download from,…...
Qt——QPushButton控件的常见属性、方法和信号
Qt中QPushButton控件的常见属性、方法和信号 一、QPushButton控件常见属性 一、QPushButton控件常见方法 一、QPushButton控件常见信号 一、QPushButton控件常见属性(Properties) 1. text: 描述:按钮上显示的文本。 用法: butto…...
网站开发 文件上传慢/推广网站的文案
本系列重点是涉及 配置过程 ,对注释的用法不多介绍。 注释语法越来越多的被业界所使用,并且注释配置相对于 XML 配置具有很多的优势:它可以充分利用 Java 的反射机制获取类结构信息,这些信息可以有效减少配置的工作。注释和 Java 代码位于一个…...
济南市个人网站制作/谷歌平台推广外贸
1. 简单三级联动 效果图: 思想:当选择省时,创建对应的市子对象,并将其加入父元素中。当选择某一个市时,创建对应的区子对象,并将其加入父元素中。当选择其他省/市时,对应的市、区/区的选项都变…...
如何快速建设自适应网站/深圳网络营销策划
本节书摘来自华章计算机《大数据架构和算法实现之路:电商系统的技术实战》一书中的第2章,第2.4节,作者 黄 申,更多章节内容可以访问云栖社区“华章计算机”公众号查看。 2.4 案例实践 2.4.1 使用R进行K均值聚类 在实践部分&…...
那个网站可以做网站测速对比/百度推广后台登录页面
文章目录1.下载安装包2.上传安装包3.修改配置文件3.1修改zeppelin-site.xml文件3.2修改zeppelin-env.sh文件4.启动zeppelin5.配置hive解释器5.1环境和变量配置5.2 在web界面配置集成hive6.使用Zepplin的hive解释器1.下载安装包 官网直达 选择zeppelin-0.8.1-bin-all.tgz 2.上…...
企业网站的公司和产品信息的介绍与网络营销关系/深圳网站制作推广
题目:原题链接(中等) 标签:广度优先搜索、深度优先搜索 解法时间复杂度空间复杂度执行用时Ans 1 (Python)O(N84)O(N8^4)O(N84)O(N)O(N)O(N)36ms (79.65%)Ans 2 (Python)Ans 3 (Python) 解法一: class Solution:_CHA…...
国内做免费视频网站有哪些/浙江seo推广
本篇文章我们来讲讲如何对MySQL数据库进行更新操作,对数据库进行增删改查操作是我们必会的基础之一,会了这个增删改查我们可以在这基础上去编写更多的东西,废话不多说了,我们来看一下本篇文章的内容吧!1、第一种&#…...