当前位置: 首页 > news >正文

MQ:kafka-消费者的三种语义

文章目录

  • 前言
  • (一) 创建topic
  • (二) 生产者
  • (三)消费者
    • 1. At-most-once Kafka Consumer
    • 2. At-least-once kafka consumer
    • 3. 使用subscribe实现Exactly-once
    • 4. 使用assign实现Exactly-once

前言

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等。

(一) 创建topic

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

(二) 生产者

public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));}}
}

(三)消费者

消费者注册到kafka有多种方式:

subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。

assign:这种方式注册的消费者不会进行rebalance。

上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。

1. At-most-once Kafka Consumer

做多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是

1). enable.auto.commit设置为true。

2). auto.commit.interval.ms设置为一个较低的时间范围。

3). consumer.commitSync()不要调用该方法。

由于上面的配置,就可以使得kafka有线程负责按照指定间隔提交offset。

但是这种方式会使得kafka消费者有两种消费语义:

a.最多一次语义->at-most-once

消费者的offset已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的offset处消费,导致上次在处理的消息部分丢失。

b. 最少一次消费语义->at-least-once

消费者已经处理完了,但是offset还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。

代码如下:

public class AtMostOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting  AtMostOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms", "101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes", "135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer)  {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20);}
}

2. At-least-once kafka consumer

实现最少一次消费语义的消费者也很简单。

1). 设置enable.auto.commit为false

2). 消息处理完之后手动调用consumer.commitSync()

这种方式就是要手动在处理完该次poll得到消息之后,调用offset异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。

代码如下:

public class AtLeastOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing             // message.props.put("auto.commit.interval.ms", "999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes", "135");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process.consumer.commitSync();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20);}
}

3. 使用subscribe实现Exactly-once

使用subscribe实现Exactly-once 很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 使用subcribe定于topic。

4). 实现一个ConsumerRebalanceListener,在该listener内部执行

consumer.seek(topicPartition,offset),从指定的topic/partition的offset处启动。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically//  assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());// Save processed offset in external storage.offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());}}}
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));}}
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where 
* program runs.
*/
public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** Overwrite the offset for the topic in an external storage.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;}
}

4. 使用assign实现Exactly-once

使用assign实现Exactly-once 也很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 调用assign注册kafka消费者到kafka

4). 初次启动的时候,调用consumer.seek(topicPartition,offset)来指定offset。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition = 1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition.consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum                 // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. But, if you are looking for example of how to                * dynamically listens to partition and want to manually control offset then see* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically. */private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                                 record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());}}}
}

相关文章:

MQ:kafka-消费者的三种语义

文章目录 前言(一) 创建topic(二) 生产者&#xff08;三&#xff09;消费者1. At-most-once Kafka Consumer2. At-least-once kafka consumer3. 使用subscribe实现Exactly-once4. 使用assign实现Exactly-once 前言 本文主要是以kafka 09的client为例子&#xff0c;详解kafka c…...

中国1km分辨率SSP119情景(SSP119、SSP245 SSP585),模式逐月降水量数据集(2021-2100)

目录 简介 摘要 代码 引用 网址推荐 知识星球 机器学习 干旱监测平台 中国1km分辨率SSP119情景EC-Earth3模式逐月降水量数据集(2021-2100) 简介 该数据集为中国多情景多模式逐月降水量数据&#xff0c;空间分辨率为0.0083333&#xff08;约1km),时间为2021年1月-2100年…...

21天掌握javaweb-->第8天:前后端分离架构与Axios请求

前后端分离架构概念 前后端分离架构是一种现代Web应用开发模式,其中前端和后端分别独立开发和部署,通过API进行数据交互。这种架构使得前端专注于用户界面和用户体验,而后端则专注于业务逻辑和数据处理。 优势 开发效率高:前后端可以并行开发,减少了开发时间。技术栈灵活…...

基于阻塞队列的生产者消费者模型动画演示

一个基于阻塞队列的生产者消费者模型的动画演示&#xff1a; 这是打包好的程序。程序是用 QT 写的。 通过网盘分享的文件&#xff1a;CP模型.7z 链接: https://pan.baidu.com/s/1YjC7YiSqHGqdr6bbffaDWg?pwde6g5 提取码: e6g5 CP模型...

DHCP和BOOTP选项及DHCP协议操作详解

DHCP和BOOTP选项及DHCP协议操作详解 DHCP与BOOTP简介 1. BOOTP&#xff08;Bootstrap Protocol&#xff09; 功能&#xff1a;提供静态配置的IP分配。用途&#xff1a;在早期用于无盘工作站启动时获取IP地址和基本配置。缺点&#xff1a;只能提供静态IP配置&#xff0c;无法动…...

数据结构--链表和单链表详解及实现

一.前言 数据结构思维导图如下&#xff0c;灰色标记的是之前讲过的&#xff0c;本文将带你走近单链表(红色标记部分)&#xff0c;希望大家有所收获&#x1f339;&#x1f339; 二.链表的定义和概念 在讲单链表之前&#xff0c;我们先学习一下链表 2.1 链表的定义 链表是一种…...

vue3基础知识

书接上文&#xff0c;这篇继续来学习vue3的核心语法&#xff0c;可以先看上一篇再来看这篇效果更好。 1. computed computed 用于创建 计算属性&#xff0c;即基于其他响应式数据的值动态计算并缓存的属性。它的主要作用是优化性能和提高代码的可维护性&#xff0c;避免不必要…...

【Linux系统】Ubuntu 缓冲区机制

在Ubuntu中&#xff0c;和其他操作系统有个不一样的机制&#xff1a;缓冲区。这篇文章是对与缓冲区的详细介绍。 在 Ubuntu 中&#xff08;以及其他基于 Linux 的操作系统&#xff09;&#xff0c;缓冲区&#xff08;Buffer&#xff09;是内核用于优化 I/O 操作的重要机制。它…...

ChatGPT 最新推出的 Pro 订阅计划,具备哪些能力 ?

OpenAI 最近推出了 ChatGPT Pro&#xff0c;这是一个每月收费 200 美元的高级订阅计划&#xff0c;旨在为用户提供对 OpenAI 最先进模型和功能的高级访问。 以下是 ChatGPT Pro 的主要功能和能力&#xff1a; 高级模型访问&#xff1a; o1 模型&#xff1a;包括 o1 和 o1 Pro…...

数据结构理论

内容来源青岛大学数据结构与算法课程&#xff0c;链接&#xff1a;数据结构与算法基础&#xff08;青岛大学-王卓&#xff09;_哔哩哔哩_bilibili 绪论 数据结构概述 数据结构和算法的定义&#xff1a;我们如何把现实中大量而复杂的问题以特定的数据类型和特定的存储结构保存…...

es 3期 第14节-全文文本分词查询

#### 1.Elasticsearch是数据库&#xff0c;不是普通的Java应用程序&#xff0c;传统数据库需要的硬件资源同样需要&#xff0c;提升性能最有效的就是升级硬件。 #### 2.Elasticsearch是文档型数据库&#xff0c;不是关系型数据库&#xff0c;不具备严格的ACID事务特性&#xff…...

六安市第二届网络安全大赛复现

misc 听说你也喜欢俄罗斯方块&#xff1f; ppt拼接之后 缺三个角补上 flag{qfnh_wergh_wqef} 流量分析 流量包分离出来一个压缩包 出来一张图片 黑色代表0白色代表1 101010 1000 rab 反的压缩包 转一下 密码&#xff1a;拾叁拾陆叁拾贰陆拾肆 密文&#xff1a;4p4n5758…...

Sarcomere仿人灵巧手ARTUS,20个自由度拓宽机器人作业边界

Sarcomere Dynamics 是一家深度技术先驱&#xff0c;通过开发和商业化仿人机械来改变机器人行业。专注于为科研人员&#xff0c;系统集成商和制造商提供更实惠、更轻便且更灵活的末端执行器替代品。凭借创新的致动器技术&#xff0c;创造了一款紧凑、轻便且非常坚固的机械手Art…...

Django drf 基于serializers 快速使用

1. 安装: pip install djangorestframework 2. 添加rest_framework到您的INSTALLED_APPS设置。 settings.pyINSTALLED_APPS [...rest_framework, ] 3. 定义模型 models.pyfrom django.db import modelsclass BookModel(models.Model):name models.CharField(max_length64)…...

pycharm集成环境中关于安装sklearn库报错问题分析及解决

在输入pip install sklearn后&#xff0c;出现如下提示&#xff1a; pip install sklearn Collecting sklearn Using cached sklearn-0.0.post12.tar.gz (2.6 kB) Installing build dependencies ... done Getting requirements to build wheel ... error error: subprocess-…...

AI - 浅聊一下基于LangChain的AI Agent

AI - 浅聊一下基于LangChain的AI Agent 大家好&#xff0c;今天我们来聊聊一个很有意思的主题&#xff1a; AI Agent &#xff0c;就是目前非常流行的所谓的AI智能体。AI的发展日新月异&#xff0c;都2024年末了&#xff0c;如果此时小伙伴们对这个非常火的概念还不清楚的话&a…...

《【Linux】深入理解进程管理与 fork 系统调用的实现原理》

一、引言 在 Linux 操作系统中&#xff0c;进程管理是核心功能之一。进程是操作系统进行资源分配和调度的基本单位。理解进程管理的原理以及 fork 系统调用的实现对于深入掌握 Linux 系统的运行机制至关重要。本文将深入探讨 Linux 中的进程管理以及 fork 系统调用的实现原理&a…...

docker-compose部署skywalking 8.1.0

一、下载镜像 #注意 skywalking-oap-server和skywalking java agent版本强关联&#xff0c;版本需要保持一致性 docker pull elasticsearch:7.9.0 docker pull apache/skywalking-oap-server:8.1.0-es7 docker pull apache/skywalking-ui:8.1.0二、部署文件docker-compose.yam…...

AI 总结的的 AI 学习路线

一、入门阶段&#xff1a;数学基础与编程语言 数学基础 线性代数 当年白纸黑字推演&#xff0c; 都是泪啊&#xff0c;草稿本都用了一卷。 学习向量、矩阵的基本概念&#xff0c;包括向量的加法、减法、点积和叉积&#xff0c;矩阵的乘法、转置等运算。例如&#xff0c;在计算…...

离散傅里叶级数(DFS)详解

1. 引言 离散傅里叶级数&#xff08;Discrete Fourier Series, DFS&#xff09;是信号处理领域中一项基础且重要的数学工具&#xff0c;用于分析和处理周期性的离散信号。它通过将离散时间信号表示为一组正弦和余弦的和&#xff0c;从而使得信号在频域上得到更清晰的描述。与连…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...