当前位置: 首页 > news >正文

kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义:


public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}

二、自定义序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}

三、定义生产者和消费者

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}

相关文章:

kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义&#xff1a; public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name name;}public String getAddress() {return address;}public void setAddress(String a…...

Unity 图片资源的适配

前言 最近小编做Unity项目时&#xff0c;发现在资源处理这方面和Android有所不同&#xff1b;例如&#xff1a;Android的资源文件夹res下会有着mipmap-mdpi&#xff0c;mipmap-hdpi&#xff0c;mipmap-xhdpi&#xff0c;mipmap-xxhdpi&#xff0c;mipmap-xxxhdpi这五个文件夹&a…...

【Axure高保真原型】通过输入框动态控制折线图

今天和大家分享通过输入框动态控制折线图的原型模板&#xff0c;在输入框里维护项目数据&#xff0c;可以自动生成对应的折线图&#xff0c;鼠标移入对应折点&#xff0c;可以查看对应数据。使用也非常方便&#xff0c;只需要修改输入框里的数据&#xff0c;或者复制粘贴文本&a…...

【Java】树结构数据的搜索

这里写自定义目录标题 需要实现的效果前端需要的json格式&#xff1a;一定是一个完整的树结构错误错误的返回格式错误的返回格式实现的效果 正确正确的返回格式正确的展示画面 后端逻辑分析代码总览 数据库表结构 需要实现的效果 前端需要的json格式&#xff1a;一定是一个完整…...

ElementUI中的日历组件加载无效的问题

在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑&#xff0c;大家要注意下。   官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …...

Git版本管理(03)stash临时操作和.gitignore配置

1 git stash操作(临时存储) 1.1 git stash常见流程 当你修改了某一个分支&#xff0c;但此时要切换分支时如果直接切换会因为一些修改冲突而checkout失败&#xff0c;那么此时就可以使用git stash命令来解决该问题。一般流程为&#xff1a; $git pull# 将当前未提交的修改…...

【ThingJS | 3D可视化】开发框架,一站式数字孪生

博主&#xff1a;_LJaXi Or 東方幻想郷 专栏&#xff1a; 数字孪生 | 3D可视化框架 开发工具&#xff1a;ThingJS在线开发工具 ThingJs 低代码开发 ThingJs 低代码开发注意点场景效果配置层级层级常用API实例化 Thing&#xff0c;加载场景load 加载函数ThingJs 层级关系图查找层…...

SpringBoot返回响应排除为 null 的字段

SpringBoot返回响应排除为 null 的字段 可以通过全局配置&#xff0c;使返回响应中为null的字段&#xff0c;不在出现在返回结果中。 注意&#xff1a;这样配置&#xff0c;使得返回响应包含的字段随请求结果变化&#xff0c;响应到底包含哪些字段不直观&#xff1b;除非业务…...

华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)

第41题 以下关于IS-IS协议说法错误的是? A、IS-IS协议支持CLNP网络 B、IS-IS 协议支持IP 网络 C、IS-IS 协议的报文直接由数据链路层封装 D、IS-IS协议是运行在AS之间的链路状态协议 答案&#xff1a;D 解析&#xff1a; 关于IS-IS协议的说法错误是D. IS-IS协议是运行在A…...

OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能

&#x1f989; AI新闻 &#x1f680; OpenAI推出GPT-3.5Turbo微调功能并更新API&#xff0c;将提供GPT-4微调功能 摘要&#xff1a;OpenAI宣布推出GPT-3.5Turbo微调功能&#xff0c;并更新API&#xff0c;使企业和开发者能够定制ChatGPT&#xff0c;达到或超过GPT-4的能力。通…...

相机成像之3A算法的综述

3A算法是摄像机成像控制技术中的三大自动控制算法。随着计算机视觉的迅速发展,该算法在摄像器材领域具有广泛的应用和前景。 那么3A控制算法又是指什么呢? (1)AE (Auto Exposure)自动曝光控制 (2)AF (Auto Focus)自动聚焦控制 (3)AWB (Auto White Balance)自动白平衡控…...

最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库

一、前言 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&#xff01…...

OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述

基于深度学习的运动目标检测&#xff08;二&#xff09;YOLOv2&YOLOv3概述 1.YOLOv2概述2.YOLOv3概述2.1 新的基础网络结构&#xff1a;2.2 采用多尺度预测机制。2.3 使用简单的逻辑回归进行分类 1.YOLOv2概述 对YOLO存在的不足&#xff0c;业界又推出了YOLOv2。YOLOv2主要…...

【Docker】已经创建好的Docker怎么设置开机自启

已经创建好的Docker怎么设置开机自启 1.使用命令Docker update来完成2.查看是否开启3.验证是否开启 1.使用命令Docker update来完成 操作步骤&#xff1a; docker update --restartalways 容器ID2.查看是否开启 docker inspect 容器Id看到这里RestartPolicy设置为如图&#…...

E - Excellent Views

Problem - E - Codeforces 问题描述&#xff1a;数组H大小都不相同。从i到j是可行的&#xff0c;当且仅当 不存在 k &#xff0c;使 ∣ i − k ∣ ≤ ∣ i − j ∣ , H k > H j 不存在k&#xff0c;使 \\ |i - k| \leq |i - j|, \quad H_k > H_j 不存在k&#xff0c;使…...

WiFi天线和NB-IoT天线不通用

表面看起来完全一样。但是把WiFi天线插到NB-IoT设备后&#xff0c;信号弱了很多。还导致设备反复重启...

IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤

dc3 windows 本地搭建步骤&#xff1a; ​​ 必要软件环境 进入原网页# 务必保证至少需要给 docker 分配&#xff1a;1 核 CPU 以及 4G 以上的运行内存&#xff01; JDK : 推荐使用 Oracle JDK 1.8 或者 OpenJDK8&#xff0c;理论来说其他版本也行&#xff1b; Maven : 推荐…...

VBA Excel自定义函数的使用 简单的语法

一个简单的教程&#xff0c;实现VBA自定义函数。 新建模块 复制后面的代码放进来 函数的入口参数不定义&#xff0c;则认为是一块区域&#xff1b; 反之&#xff0c;如FindChar1 As String&#xff0c;则认为是输入的单值。 循环和分支如下例子&#xff0c;VB比较接近自然语…...

字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP

走进后端开发流程 整个课程会带大家先从理论出发&#xff0c;思考为什么有流程 大家以后工作的团队可能不一样&#xff0c;那么不同的团队也会有不同的流程&#xff0c;这背后的逻辑是什么 然后会带大家按照走一遍从需求到上线的全流程&#xff0c;告诉大家在流程的每个阶段&am…...

线性代数-矩阵的本质

线性代数-矩阵的本质 线性代数-矩阵的本质...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...